Skip to content

Commit a4af8c1

Browse files
committed
fix(binding-mqtt): generate qos for events
1 parent f79d737 commit a4af8c1

4 files changed

Lines changed: 46 additions & 24 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lorem ipsum dolor sit amet.

packages/binding-mqtt/src/mqtt-broker-server.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import { AuthenticateError, Client, Server, Aedes } from "aedes";
2424
import * as net from "net";
2525
import * as tls from "tls";
2626
import * as TD from "@node-wot/td-tools";
27-
import { MqttBrokerServerConfig } from "./mqtt";
27+
import { MqttBrokerServerConfig, MqttForm } from "./mqtt";
2828
import {
2929
ProtocolServer,
3030
Servient,
@@ -37,6 +37,7 @@ import {
3737
import { InteractionOptions } from "wot-typescript-definitions";
3838
import { ActionElement, PropertyElement } from "wot-thing-description-types";
3939
import { Readable } from "stream";
40+
import { mapQoS } from "./util";
4041

4142
const { info, debug, error, warn } = createLoggers("binding-mqtt", "mqtt-broker-server");
4243

@@ -196,7 +197,8 @@ export default class MqttBrokerServer implements ProtocolServer {
196197
const event = thing.events[eventName];
197198

198199
const href = this.brokerURI + "/" + topic;
199-
const form = new TD.Form(href, ContentSerdes.DEFAULT);
200+
const form = new MqttForm(href, ContentSerdes.DEFAULT);
201+
form["mqv:qos"] = "2";
200202
form.op = ["subscribeevent", "unsubscribeevent"];
201203
event.forms.push(form);
202204
debug(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Event '${eventName}'`);
@@ -214,7 +216,7 @@ export default class MqttBrokerServer implements ProtocolServer {
214216
}
215217
debug(`MqttBrokerServer at ${this.brokerURI} publishing to Event topic '${eventName}' `);
216218
const buffer = await content.toBuffer();
217-
this.broker.publish(topic, buffer);
219+
this.broker.publish(topic, buffer, { retain: form["mqv:retain"], qos: mapQoS(form["mqv:qos"]) });
218220
};
219221
thing.handleSubscribeEvent(eventName, eventListener, { formIndex: event.forms.length - 1 });
220222
}

packages/binding-mqtt/src/mqtt-client.ts

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import { ProtocolClient, Content, DefaultContent, createLoggers, ContentSerdes } from "@node-wot/core";
2121
import * as TD from "@node-wot/td-tools";
2222
import * as mqtt from "mqtt";
23-
import { MqttClientConfig, MqttForm, MqttQoS } from "./mqtt";
23+
import { MqttClientConfig, MqttForm } from "./mqtt";
2424
import * as url from "url";
2525
import { Subscription } from "rxjs/Subscription";
2626
import { Readable } from "stream";
27-
import { IClientPublishOptions } from "mqtt";
2827
import MQTTMessagePool from "./mqtt-message-pool";
28+
import { mapQoS } from "./util";
2929

3030
const { debug, warn } = createLoggers("binding-mqtt", "mqtt-client");
3131

@@ -130,7 +130,7 @@ export default class MqttClient implements ProtocolClient {
130130
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
131131
await pool.publish(topic, buffer, {
132132
retain: form["mqv:retain"],
133-
qos: this.mapQoS(form["mqv:qos"]),
133+
qos: mapQoS(form["mqv:qos"]),
134134
});
135135
}
136136

@@ -152,7 +152,7 @@ export default class MqttClient implements ProtocolClient {
152152
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
153153
await pool.publish(topic, buffer, {
154154
retain: form["mqv:retain"],
155-
qos: this.mapQoS(form["mqv:qos"]),
155+
qos: mapQoS(form["mqv:qos"]),
156156
});
157157
// there will be no response
158158
return new DefaultContent(Readable.from([]));
@@ -206,21 +206,4 @@ export default class MqttClient implements ProtocolClient {
206206
}
207207
return true;
208208
}
209-
210-
private mapQoS(qos: MqttQoS | undefined): Required<IClientPublishOptions>["qos"] {
211-
switch (qos) {
212-
case "0":
213-
return 0;
214-
case "1":
215-
return 1;
216-
case "2":
217-
return 2;
218-
case undefined:
219-
return 0;
220-
default:
221-
warn(`MqttClient received unsupported QoS level '${qos}'`);
222-
warn(`MqttClient falling back to QoS level '0'`);
223-
return 0;
224-
}
225-
}
226209
}

packages/binding-mqtt/src/util.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/********************************************************************************
2+
* Copyright (c) 2023 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License v. 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and
10+
* Document License (2015-05-13) which is available at
11+
* https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document.
12+
*
13+
* SPDX-License-Identifier: EPL-2.0 OR W3C-20150513
14+
********************************************************************************/
15+
import { createLoggers } from "@node-wot/core";
16+
import { MqttQoS } from "./mqtt";
17+
import { IClientPublishOptions } from "mqtt";
18+
19+
const { debug, warn } = createLoggers("binding-mqtt", "mqtt-util");
20+
21+
export function mapQoS(qos: MqttQoS | undefined): Required<IClientPublishOptions>["qos"] {
22+
switch (qos) {
23+
case "0":
24+
return 0;
25+
case "1":
26+
return 1;
27+
case "2":
28+
return 2;
29+
case undefined:
30+
return 0;
31+
default:
32+
warn(`MqttClient received unsupported QoS level '${qos}'`);
33+
warn(`MqttClient falling back to QoS level '0'`);
34+
return 0;
35+
}
36+
}

0 commit comments

Comments
 (0)