Skip to content

Commit cec8aae

Browse files
committed
refactor(binding-mqtt): use the new vocabulary
1 parent a879e37 commit cec8aae

3 files changed

Lines changed: 47 additions & 47 deletions

File tree

packages/binding-mqtt/src/mqtt-client.ts

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,18 @@ export default class MqttClient implements ProtocolClient {
5050
): Promise<Subscription> {
5151
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
5252
const requestUri = new url.URL(form.href);
53-
const topic = requestUri.pathname.slice(1);
5453
const brokerUri: string = `${this.scheme}://` + requestUri.host;
54+
// Keeping the path as the topic for compatibility reasons.
55+
// Current specification allows only form["mqv:filter"]
56+
const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"];
5557

5658
if (this.client === undefined) {
5759
this.client = await mqtt.connectAsync(brokerUri, this.config);
5860
}
5961

6062
this.client.on("message", (receivedTopic: string, payload: Buffer) => {
6163
debug(`Received MQTT message (topic: ${receivedTopic}, data length: ${payload.length})`);
62-
if (receivedTopic === topic) {
64+
if (filter.includes(receivedTopic)) {
6365
next(new Content(contentType, Readable.from(payload)));
6466
}
6567
});
@@ -70,7 +72,7 @@ export default class MqttClient implements ProtocolClient {
7072
if (error) error(err);
7173
});
7274

73-
await this.client.subscribeAsync(topic);
75+
await this.client.subscribeAsync(filter);
7476

7577
return new Subscription(() => {
7678
if (!this.client) {
@@ -79,7 +81,7 @@ export default class MqttClient implements ProtocolClient {
7981
);
8082
return;
8183
}
82-
this.client.unsubscribe(topic);
84+
this.client.unsubscribe(filter);
8385
});
8486
}
8587

@@ -89,20 +91,19 @@ export default class MqttClient implements ProtocolClient {
8991

9092
public async writeResource(form: MqttForm, content: Content): Promise<void> {
9193
const requestUri = new url.URL(form.href);
92-
const topic = requestUri.pathname.slice(1);
9394
const brokerUri = `${this.scheme}://${requestUri.host}`;
95+
const topic = requestUri.pathname.slice(1) ?? form["mqv:topic"];
9496

9597
if (this.client === undefined) {
9698
this.client = await mqtt.connectAsync(brokerUri, this.config);
9799
}
98100

99101
// if not input was provided, set up an own body otherwise take input as body
100-
if (content === undefined) {
101-
await this.client.publishAsync(topic, Buffer.from(""));
102-
} else {
103-
const buffer = await content.toBuffer();
104-
await this.client.publishAsync(topic, buffer);
105-
}
102+
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
103+
await this.client.publishAsync(topic, buffer, {
104+
retain: form["mqv:retain"],
105+
qos: this.mapQoS(form["mqv:qos"]),
106+
});
106107
}
107108

108109
public async invokeResource(form: MqttForm, content: Content): Promise<Content> {
@@ -115,12 +116,11 @@ export default class MqttClient implements ProtocolClient {
115116
}
116117

117118
// if not input was provided, set up an own body otherwise take input as body
118-
if (content === undefined) {
119-
await this.client.publishAsync(topic, Buffer.from(""));
120-
} else {
121-
const buffer = await content.toBuffer();
122-
await this.client.publishAsync(topic, buffer);
123-
}
119+
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
120+
await this.client.publishAsync(topic, buffer, {
121+
retain: form["mqv:retain"],
122+
qos: this.mapQoS(form["mqv:qos"]),
123+
});
124124
// there will be no response
125125
return new DefaultContent(Readable.from([]));
126126
}
@@ -169,15 +169,20 @@ export default class MqttClient implements ProtocolClient {
169169
return true;
170170
}
171171

172-
private mapQoS(qos: MqttQoS): Required<IClientPublishOptions>["qos"] {
172+
private mapQoS(qos: MqttQoS | undefined): Required<IClientPublishOptions>["qos"] {
173173
switch (qos) {
174-
case 2:
175-
return (qos = 2);
176-
case 1:
177-
return (qos = 1);
178-
case 0:
174+
case "0":
175+
return 0;
176+
case "1":
177+
return 1;
178+
case "2":
179+
return 2;
180+
case undefined:
181+
return 0;
179182
default:
180-
return (qos = 0);
183+
warn(`MqttClient received unsupported QoS level '${qos}'`);
184+
warn(`MqttClient falling back to QoS level '0'`);
185+
return 0;
181186
}
182187
}
183188
}

packages/binding-mqtt/src/mqtt.ts

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,16 @@ export * from "./mqtt-client-factory";
2929
export * from "./mqtts-client-factory";
3030
export * from "./mqtt-broker-server";
3131

32-
/**
33-
* MQTT Quality of Service level.
34-
* QoS0: Fire-and-forget
35-
* QoS1: Deliver-at-least-once
36-
* QoS2: Deliver-exactly-once
37-
*/
38-
export enum MqttQoS {
39-
QoS0,
40-
QoS1,
41-
QoS2,
42-
}
43-
32+
export type MqttQoS = "0" | "1" | "2";
4433
export class MqttForm extends Form {
45-
public "mqtt:qos": MqttQoS = MqttQoS.QoS0;
46-
public "mqtt:retain": boolean;
34+
public "mqv:qos"?: MqttQoS = "0";
35+
public "mqv:retain"?: boolean;
36+
37+
public "mqv:topic"?: string;
38+
39+
public "mqv:filter"?: string | string[];
40+
41+
public "mqv:controlPacket"?: "publish" | "subscribe" | "unsubscribe";
4742
}
4843

4944
export interface MqttClientConfig {

packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import * as chai from "chai";
2121
import chaiAsPromised from "chai-as-promised";
22-
import { MqttClient, MqttForm, MqttQoS } from "../src/mqtt";
22+
import { MqttClient, MqttForm } from "../src/mqtt";
2323
import { expect, should } from "chai";
2424
import { Aedes, Server } from "aedes";
2525
import * as net from "net";
@@ -61,8 +61,8 @@ describe("MQTT client implementation - unit", () => {
6161
const mqttClient = new MqttClient();
6262
const form: MqttForm = {
6363
href: brokerUri + "/" + property,
64-
"mqtt:qos": MqttQoS.QoS0,
65-
"mqtt:retain": false,
64+
"mqv:qos": "0",
65+
"mqv:retain": false,
6666
};
6767

6868
mqttClient
@@ -88,8 +88,8 @@ describe("MQTT client implementation - unit", () => {
8888
const mqttClient = new MqttClient();
8989
const form: MqttForm = {
9090
href: brokerUri + "/" + property,
91-
"mqtt:qos": MqttQoS.QoS0,
92-
"mqtt:retain": false,
91+
"mqv:qos": "0",
92+
"mqv:retain": false,
9393
};
9494

9595
mqttClient
@@ -139,8 +139,8 @@ describe("MQTT client implementation - unit", () => {
139139

140140
const form: MqttForm = {
141141
href: brokerUri + "/" + property,
142-
"mqtt:qos": MqttQoS.QoS1,
143-
"mqtt:retain": false,
142+
"mqv:qos": "1",
143+
"mqv:retain": false,
144144
};
145145

146146
mqttClient
@@ -158,8 +158,8 @@ describe("MQTT client implementation - unit", () => {
158158

159159
const form: MqttForm = {
160160
href: brokerUri + "/" + property,
161-
"mqtt:qos": MqttQoS.QoS1,
162-
"mqtt:retain": false,
161+
"mqv:qos": "1",
162+
"mqv:retain": false,
163163
};
164164

165165
await mqttClient.subscribeResource(form, () => {});

0 commit comments

Comments
 (0)