Skip to content

Commit f79d737

Browse files
committed
feat(binding-mqtt): use a message pool for each broker
1 parent cec8aae commit f79d737

3 files changed

Lines changed: 175 additions & 39 deletions

File tree

packages/binding-mqtt/src/mqtt-client.ts

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import * as url from "url";
2525
import { Subscription } from "rxjs/Subscription";
2626
import { Readable } from "stream";
2727
import { IClientPublishOptions } from "mqtt";
28+
import MQTTMessagePool from "./mqtt-message-pool";
2829

2930
const { debug, warn } = createLoggers("binding-mqtt", "mqtt-client");
3031

@@ -35,6 +36,7 @@ declare interface MqttClientSecurityParameters {
3536

3637
export default class MqttClient implements ProtocolClient {
3738
private scheme: string;
39+
private pools: Map<string, MQTTMessagePool> = new Map();
3840

3941
constructor(private config: MqttClientConfig = {}, secure = false) {
4042
this.scheme = "mqtt" + (secure ? "s" : "");
@@ -55,52 +57,78 @@ export default class MqttClient implements ProtocolClient {
5557
// Current specification allows only form["mqv:filter"]
5658
const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"];
5759

58-
if (this.client === undefined) {
59-
this.client = await mqtt.connectAsync(brokerUri, this.config);
60+
let pool = this.pools.get(brokerUri);
61+
62+
if (pool == null) {
63+
pool = new MQTTMessagePool();
64+
this.pools.set(brokerUri, pool);
6065
}
6166

62-
this.client.on("message", (receivedTopic: string, payload: Buffer) => {
63-
debug(`Received MQTT message (topic: ${receivedTopic}, data length: ${payload.length})`);
64-
if (filter.includes(receivedTopic)) {
65-
next(new Content(contentType, Readable.from(payload)));
67+
await pool.connect(brokerUri, this.config);
68+
69+
pool.subscribe(
70+
filter,
71+
(topic: string, message: Buffer) => {
72+
next(new Content(contentType, Readable.from(message)));
73+
},
74+
(e: Error) => {
75+
if (error) error(e);
6676
}
67-
});
77+
);
6878

69-
this.client.on("error", (err: Error) => {
70-
// Connection errors are fired as a result of mqtt.connectAsync
71-
// here we have to handle only parsing errors.
72-
if (error) error(err);
73-
});
79+
return new Subscription(() => {});
80+
}
81+
82+
public async readResource(form: MqttForm): Promise<Content> {
83+
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
84+
const requestUri = new url.URL(form.href);
85+
const brokerUri: string = `${this.scheme}://` + requestUri.host;
86+
// Keeping the path as the topic for compatibility reasons.
87+
// Current specification allows only form["mqv:filter"]
88+
const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"];
7489

75-
await this.client.subscribeAsync(filter);
90+
let pool = this.pools.get(brokerUri);
7691

77-
return new Subscription(() => {
78-
if (!this.client) {
79-
warn(
80-
`MQTT Client is undefined. This means that the client either failed to connect or was never initialized.`
81-
);
82-
return;
83-
}
84-
this.client.unsubscribe(filter);
92+
if (pool == null) {
93+
pool = new MQTTMessagePool();
94+
this.pools.set(brokerUri, pool);
95+
}
96+
97+
await pool.connect(brokerUri, this.config);
98+
99+
const result = await new Promise<Content>((resolve, reject) => {
100+
pool!.subscribe(
101+
filter,
102+
(topic: string, message: Buffer) => {
103+
resolve(new Content(contentType, Readable.from(message)));
104+
},
105+
(e: Error) => {
106+
reject(e);
107+
}
108+
);
85109
});
86-
}
87110

88-
public async readResource(form: MqttForm): Promise<Content> {
89-
throw new Error("Method not implemented.");
111+
await pool.unsubscribe(filter);
112+
return result;
90113
}
91114

92115
public async writeResource(form: MqttForm, content: Content): Promise<void> {
93116
const requestUri = new url.URL(form.href);
94117
const brokerUri = `${this.scheme}://${requestUri.host}`;
95118
const topic = requestUri.pathname.slice(1) ?? form["mqv:topic"];
96119

97-
if (this.client === undefined) {
98-
this.client = await mqtt.connectAsync(brokerUri, this.config);
120+
let pool = this.pools.get(brokerUri);
121+
122+
if (pool == null) {
123+
pool = new MQTTMessagePool();
124+
this.pools.set(brokerUri, pool);
99125
}
100126

127+
await pool.connect(brokerUri, this.config);
128+
101129
// if not input was provided, set up an own body otherwise take input as body
102130
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
103-
await this.client.publishAsync(topic, buffer, {
131+
await pool.publish(topic, buffer, {
104132
retain: form["mqv:retain"],
105133
qos: this.mapQoS(form["mqv:qos"]),
106134
});
@@ -111,13 +139,18 @@ export default class MqttClient implements ProtocolClient {
111139
const topic = requestUri.pathname.slice(1);
112140
const brokerUri = `${this.scheme}://${requestUri.host}`;
113141

114-
if (this.client === undefined) {
115-
this.client = await mqtt.connectAsync(brokerUri, this.config);
142+
let pool = this.pools.get(brokerUri);
143+
144+
if (pool == null) {
145+
pool = new MQTTMessagePool();
146+
this.pools.set(brokerUri, pool);
116147
}
117148

149+
await pool.connect(brokerUri, this.config);
150+
118151
// if not input was provided, set up an own body otherwise take input as body
119152
const buffer = content === undefined ? Buffer.from("") : await content.toBuffer();
120-
await this.client.publishAsync(topic, buffer, {
153+
await pool.publish(topic, buffer, {
121154
retain: form["mqv:retain"],
122155
qos: this.mapQoS(form["mqv:qos"]),
123156
});
@@ -127,10 +160,12 @@ export default class MqttClient implements ProtocolClient {
127160

128161
public async unlinkResource(form: TD.Form): Promise<void> {
129162
const requestUri = new url.URL(form.href);
163+
const brokerUri: string = `${this.scheme}://` + requestUri.host;
130164
const topic = requestUri.pathname.slice(1);
131165

132-
if (this.client != null && this.client.connected) {
133-
await this.client.unsubscribeAsync(topic);
166+
const pool = this.pools.get(brokerUri);
167+
if (pool != null) {
168+
await pool.unsubscribe(topic);
134169
debug(`MqttClient unsubscribed from topic '${topic}'`);
135170
}
136171
}
@@ -147,6 +182,9 @@ export default class MqttClient implements ProtocolClient {
147182
}
148183

149184
public async stop(): Promise<void> {
185+
for (const pool of this.pools.values()) {
186+
await pool.end();
187+
}
150188
if (this.client) return this.client.endAsync();
151189
}
152190

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/********************************************************************************
2+
* Copyright (c) 2023 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License v. 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and
10+
* Document License (2015-05-13) which is available at
11+
* https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document.
12+
*
13+
* SPDX-License-Identifier: EPL-2.0 OR W3C-20150513
14+
********************************************************************************/
15+
import { createLoggers } from "@node-wot/core";
16+
import { MqttClientConfig } from "./mqtt";
17+
import * as mqtt from "mqtt";
18+
19+
const { debug, warn } = createLoggers("binding-mqtt", "mqtt-message-pool");
20+
21+
export default class MQTTMessagePool {
22+
client?: mqtt.MqttClient;
23+
subscribers: Map<string, (topic: string, message: Buffer) => void> = new Map();
24+
errors: Map<string, (error: Error) => void> = new Map();
25+
26+
public async connect(brokerURI: string, config: MqttClientConfig): Promise<void> {
27+
if (this.client === undefined) {
28+
this.client = await mqtt.connectAsync(brokerURI, config);
29+
this.client.on("message", (receivedTopic: string, payload: Buffer) => {
30+
debug(
31+
`Received MQTT message from ${brokerURI} (topic: ${receivedTopic}, data length: ${payload.length})`
32+
);
33+
this.subscribers.get(receivedTopic)?.(receivedTopic, payload);
34+
});
35+
// Connection errors should be deal by the connectAsync
36+
// here we handle "runtime" parsing errors, but we can't do much
37+
// therefore we broadcast the error to all subscribers
38+
this.client.on("error", (error: Error) => {
39+
warn(`MQTT client error: ${error.message}`);
40+
this.errors.forEach((errorCallback) => {
41+
errorCallback(error);
42+
});
43+
});
44+
}
45+
}
46+
47+
public async subscribe(
48+
filter: string | string[],
49+
callback: (topic: string, message: Buffer) => void,
50+
error: (error: Error) => void
51+
): Promise<void> {
52+
if (this.client == null) {
53+
throw new Error("MQTT client is not connected");
54+
}
55+
56+
const filters = Array.isArray(filter) ? filter : [filter];
57+
filters.forEach((f) => {
58+
if (this.subscribers.has(f)) {
59+
warn(`Already subscribed to ${f}; we are not supporting multiple subscribers to the same topic`);
60+
warn(`The subscription will be ignored`);
61+
return;
62+
}
63+
64+
this.subscribers.set(f, callback);
65+
this.errors.set(f, error);
66+
});
67+
68+
await this.client.subscribeAsync(filters);
69+
}
70+
71+
public async unsubscribe(filter: string | string[]): Promise<void> {
72+
if (this.client == null) {
73+
throw new Error("MQTT client is not connected");
74+
}
75+
76+
const filters = Array.isArray(filter) ? filter : [filter];
77+
filters.forEach((f) => {
78+
this.subscribers.delete(f);
79+
this.errors.delete(f);
80+
});
81+
82+
await this.client.unsubscribeAsync(filters);
83+
}
84+
85+
public async publish(topic: string, message: Buffer, options?: mqtt.IClientPublishOptions): Promise<void> {
86+
if (this.client == null) {
87+
throw new Error("MQTT client is not connected");
88+
}
89+
90+
debug(`Publishing MQTT message to ${topic} (data length: ${message.length})`);
91+
await this.client.publishAsync(topic, message, options);
92+
}
93+
94+
public async end(): Promise<void> {
95+
for (const filter of this.subscribers.keys()) {
96+
this.unsubscribe(filter);
97+
}
98+
return this.client?.endAsync();
99+
}
100+
}

packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ describe("MQTT client implementation - unit", () => {
6161
const mqttClient = new MqttClient();
6262
const form: MqttForm = {
6363
href: brokerUri + "/" + property,
64-
"mqv:qos": "0",
64+
"mqv:qos": "2",
6565
"mqv:retain": false,
6666
};
6767

@@ -77,8 +77,6 @@ describe("MQTT client implementation - unit", () => {
7777
})
7878
.then(async (sub) => {
7979
await mqttClient.invokeResource(form, new Content("", Readable.from(Buffer.from("test"))));
80-
// Need to manually unsubscribe because stopping the client will not unsubscribe all subscriptions
81-
sub.unsubscribe();
8280
await mqttClient.stop();
8381
})
8482
.catch((err) => done(err));
@@ -88,7 +86,7 @@ describe("MQTT client implementation - unit", () => {
8886
const mqttClient = new MqttClient();
8987
const form: MqttForm = {
9088
href: brokerUri + "/" + property,
91-
"mqv:qos": "0",
89+
"mqv:qos": "2",
9290
"mqv:retain": false,
9391
};
9492

@@ -97,20 +95,20 @@ describe("MQTT client implementation - unit", () => {
9795
/** No-op */
9896
})
9997
.then(async (sub) => {
100-
sub.unsubscribe();
101-
const sub2 = await mqttClient.subscribeResource(form, async (value: Content) => {
98+
await mqttClient.unlinkResource(form);
99+
await mqttClient.subscribeResource(form, async (value: Content) => {
102100
try {
103101
const data = await value.toBuffer();
104102
expect(data.toString()).to.be.equal("test");
105103
done();
106104
} catch (err) {
107105
done(err);
108106
} finally {
107+
// Note: stopping the client clears also all subscriptions
109108
await mqttClient.stop();
110109
}
111110
});
112111
await mqttClient.invokeResource(form, new Content("", Readable.from(Buffer.from("test"))));
113-
sub2.unsubscribe();
114112
})
115113
.catch((err) => done(err));
116114
}).timeout(10000);

0 commit comments

Comments
 (0)