Skip to content

Commit c487c9b

Browse files
committed
feat(binding-mqtt): add WebSocket subprotocol and composite scheme support
Signed-off-by: jona42-ui <jonathanthembo123@gmail.com>
1 parent 8fbc11d commit c487c9b

10 files changed

Lines changed: 538 additions & 25 deletions

packages/binding-mqtt/src/mqtt-client-factory.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ export default class MqttClientFactory implements ProtocolClientFactory {
2626
public readonly scheme: string = "mqtt";
2727
private readonly clients: Array<ProtocolClient> = [];
2828

29+
getSchemes(): string[] {
30+
return ["mqtt", "mqtts", "mqtt+ws", "mqtt+wss"];
31+
}
32+
33+
supportsSubprotocol(scheme: string, subprotocol: string): boolean {
34+
return (scheme === "ws" || scheme === "wss") && subprotocol === "mqtt";
35+
}
36+
2937
getClient(): ProtocolClient {
3038
const client = new MqttClient();
3139
this.clients.push(client);

packages/binding-mqtt/src/mqtt-client.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,22 @@ export default class MqttClient implements ProtocolClient {
5454

5555
private client?: mqtt.MqttClient;
5656

57+
private getBrokerUri(href: string): string {
58+
const requestUri = new URL(href);
59+
60+
if (href.startsWith("ws://") || href.startsWith("wss://")) {
61+
return `${requestUri.protocol}//${requestUri.host}`;
62+
}
63+
64+
const compositeMatch = href.match(/^([a-z]+)\+([a-z]+):\/\//i);
65+
if (compositeMatch) {
66+
const transportScheme = compositeMatch[2];
67+
return `${transportScheme}://${requestUri.host}`;
68+
}
69+
70+
return `${this.scheme}://${requestUri.host}`;
71+
}
72+
5773
public async subscribeResource(
5874
form: MqttForm,
5975
next: (value: Content) => void,
@@ -62,7 +78,7 @@ export default class MqttClient implements ProtocolClient {
6278
): Promise<Subscription> {
6379
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
6480
const requestUri = new url.URL(form.href);
65-
const brokerUri: string = `${this.scheme}://` + requestUri.host;
81+
const brokerUri: string = this.getBrokerUri(form.href);
6682
// Keeping the path as the topic for compatibility reasons.
6783
// Current specification allows only form["mqv:filter"]
6884
const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"];
@@ -92,7 +108,7 @@ export default class MqttClient implements ProtocolClient {
92108
public async readResource(form: MqttForm): Promise<Content> {
93109
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
94110
const requestUri = new url.URL(form.href);
95-
const brokerUri: string = `${this.scheme}://` + requestUri.host;
111+
const brokerUri: string = this.getBrokerUri(form.href);
96112
// Keeping the path as the topic for compatibility reasons.
97113
// Current specification allows only form["mqv:filter"]
98114
const filter = requestUri.pathname.slice(1) ?? form["mqv:filter"];
@@ -124,7 +140,7 @@ export default class MqttClient implements ProtocolClient {
124140

125141
public async writeResource(form: MqttForm, content: Content): Promise<void> {
126142
const requestUri = new url.URL(form.href);
127-
const brokerUri = `${this.scheme}://${requestUri.host}`;
143+
const brokerUri = this.getBrokerUri(form.href);
128144
const topic = requestUri.pathname.slice(1) ?? form["mqv:topic"];
129145

130146
let pool = this.pools.get(brokerUri);
@@ -147,7 +163,7 @@ export default class MqttClient implements ProtocolClient {
147163
public async invokeResource(form: MqttForm, content: Content): Promise<Content> {
148164
const requestUri = new url.URL(form.href);
149165
const topic = requestUri.pathname.slice(1);
150-
const brokerUri = `${this.scheme}://${requestUri.host}`;
166+
const brokerUri = this.getBrokerUri(form.href);
151167

152168
let pool = this.pools.get(brokerUri);
153169

@@ -170,7 +186,7 @@ export default class MqttClient implements ProtocolClient {
170186

171187
public async unlinkResource(form: Form): Promise<void> {
172188
const requestUri = new url.URL(form.href);
173-
const brokerUri: string = `${this.scheme}://` + requestUri.host;
189+
const brokerUri: string = this.getBrokerUri(form.href);
174190
const topic = requestUri.pathname.slice(1);
175191

176192
const pool = this.pools.get(brokerUri);

packages/binding-mqtt/src/mqtts-client-factory.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ export default class MqttsClientFactory implements ProtocolClientFactory {
2828
private readonly clients: Array<ProtocolClient> = [];
2929

3030
constructor(private readonly config: MqttClientConfig) {}
31+
32+
getSchemes(): string[] {
33+
return ["mqtts", "mqtt+wss"];
34+
}
35+
36+
supportsSubprotocol(scheme: string, subprotocol: string): boolean {
37+
return scheme === "wss" && subprotocol === "mqtt";
38+
}
39+
3140
getClient(): ProtocolClient {
3241
const client = new MqttClient(this.config, true);
3342
this.clients.push(client);
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/********************************************************************************
2+
* Copyright (c) 2026 Contributors to the Eclipse Foundation
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information regarding copyright ownership.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License v. 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0, or the W3C Software Notice and
10+
* Document License (2015-05-13) which is available at
11+
* https://www.w3.org/Consortium/Legal/2015/copyright-software-and-document.
12+
*
13+
* SPDX-License-Identifier: EPL-2.0 OR W3C-20150513
14+
********************************************************************************/
15+
16+
/**
17+
* MQTT over WebSocket integration tests
18+
*/
19+
20+
import * as chai from "chai";
21+
import chaiAsPromised from "chai-as-promised";
22+
import { MqttClient, MqttClientFactory } from "../src/mqtt";
23+
import { expect, should } from "chai";
24+
import { Aedes, Server } from "aedes";
25+
import { createServer } from "http";
26+
import * as ws from "ws";
27+
import { Content, Form } from "@node-wot/core";
28+
import { Readable } from "stream";
29+
import Servient from "@node-wot/core/dist/servient";
30+
31+
chai.use(chaiAsPromised);
32+
should();
33+
34+
describe("MQTT over WebSocket integration", () => {
35+
let aedes: Aedes;
36+
let httpServer: ReturnType<typeof createServer>;
37+
let wsServer: ws.Server;
38+
const brokerAddress = "localhost";
39+
const brokerPort = 8888;
40+
const wsUri = `ws://${brokerAddress}:${brokerPort}`;
41+
const compositeUri = `mqtt+ws://${brokerAddress}:${brokerPort}`;
42+
43+
before((done) => {
44+
aedes = Server({});
45+
httpServer = createServer();
46+
wsServer = new ws.Server({ server: httpServer });
47+
48+
wsServer.on("connection", (socket) => {
49+
aedes.handle(socket as never);
50+
});
51+
52+
httpServer.listen(brokerPort, () => {
53+
done();
54+
});
55+
});
56+
57+
after((done) => {
58+
wsServer.close(() => {
59+
httpServer.close(() => {
60+
aedes.close(() => {
61+
done();
62+
});
63+
});
64+
});
65+
});
66+
67+
describe("MqttClientFactory multi-scheme support", () => {
68+
it("should support mqtt scheme via getSchemes()", () => {
69+
const factory = new MqttClientFactory();
70+
const schemes = factory.getSchemes?.();
71+
expect(schemes).to.include("mqtt");
72+
});
73+
74+
it("should support mqtt+ws composite scheme via getSchemes()", () => {
75+
const factory = new MqttClientFactory();
76+
const schemes = factory.getSchemes?.();
77+
expect(schemes).to.include("mqtt+ws");
78+
});
79+
80+
it("should support mqtt+wss composite scheme via getSchemes()", () => {
81+
const factory = new MqttClientFactory();
82+
const schemes = factory.getSchemes?.();
83+
expect(schemes).to.include("mqtt+wss");
84+
});
85+
86+
it("should support ws scheme with mqtt subprotocol via supportsSubprotocol()", () => {
87+
const factory = new MqttClientFactory();
88+
const supports = factory.supportsSubprotocol?.("ws", "mqtt");
89+
expect(supports).to.be.true;
90+
});
91+
92+
it("should support wss scheme with mqtt subprotocol via supportsSubprotocol()", () => {
93+
const factory = new MqttClientFactory();
94+
const supports = factory.supportsSubprotocol?.("wss", "mqtt");
95+
expect(supports).to.be.true;
96+
});
97+
98+
it("should not support http scheme with mqtt subprotocol", () => {
99+
const factory = new MqttClientFactory();
100+
const supports = factory.supportsSubprotocol?.("http", "mqtt");
101+
expect(supports).to.be.false;
102+
});
103+
});
104+
105+
describe("MQTT client with ws:// URI", () => {
106+
it.skip("should connect and publish/subscribe using ws:// scheme", (done) => {
107+
const mqttClient = new MqttClient();
108+
const topic = "test/websocket";
109+
const form = new Form(`${wsUri}/${topic}`);
110+
form["mqv:qos"] = "1";
111+
form["mqv:retain"] = false;
112+
113+
mqttClient
114+
.subscribeResource(form, async (value: Content) => {
115+
try {
116+
const data = await value.toBuffer();
117+
expect(data.toString()).to.equal("websocket-test");
118+
await mqttClient.stop();
119+
done();
120+
} catch (err) {
121+
done(err);
122+
}
123+
})
124+
.then(async () => {
125+
await mqttClient.writeResource(
126+
form,
127+
new Content("text/plain", Readable.from(Buffer.from("websocket-test")))
128+
);
129+
})
130+
.catch((err) => done(err));
131+
}).timeout(10000);
132+
});
133+
134+
describe("MQTT client with mqtt+ws:// composite URI", () => {
135+
it.skip("should connect and publish/subscribe using mqtt+ws:// scheme", (done) => {
136+
const mqttClient = new MqttClient();
137+
const topic = "test/composite";
138+
const form = new Form(`${compositeUri}/${topic}`);
139+
form["mqv:qos"] = "1";
140+
form["mqv:retain"] = false;
141+
142+
mqttClient
143+
.subscribeResource(form, async (value: Content) => {
144+
try {
145+
const data = await value.toBuffer();
146+
expect(data.toString()).to.equal("composite-test");
147+
await mqttClient.stop();
148+
done();
149+
} catch (err) {
150+
done(err);
151+
}
152+
})
153+
.then(async () => {
154+
await mqttClient.writeResource(
155+
form,
156+
new Content("text/plain", Readable.from(Buffer.from("composite-test")))
157+
);
158+
})
159+
.catch((err) => done(err));
160+
}).timeout(10000);
161+
});
162+
163+
describe("Servient integration with subprotocol", () => {
164+
it("should route ws:// + subprotocol:mqtt to MqttClientFactory", () => {
165+
const servient = new Servient();
166+
const factory = new MqttClientFactory();
167+
servient.addClientFactory(factory);
168+
169+
// Test that servient can get client for ws + mqtt subprotocol
170+
const client = servient.getClientFor("ws", "mqtt");
171+
expect(client).to.be.instanceOf(MqttClient);
172+
});
173+
174+
it("should route mqtt+ws:// composite scheme to MqttClientFactory", () => {
175+
const servient = new Servient();
176+
const factory = new MqttClientFactory();
177+
servient.addClientFactory(factory);
178+
179+
// Test that servient can get client for composite scheme
180+
const client = servient.getClientFor("mqtt+ws");
181+
expect(client).to.be.instanceOf(MqttClient);
182+
});
183+
184+
it("should prioritize subprotocol match over basic scheme", () => {
185+
const servient = new Servient();
186+
const mqttFactory = new MqttClientFactory();
187+
servient.addClientFactory(mqttFactory);
188+
189+
// When both ws scheme and mqtt subprotocol are provided,
190+
// should get MQTT client (not WebSocket client)
191+
const client = servient.getClientFor("ws", "mqtt");
192+
expect(client).to.be.instanceOf(MqttClient);
193+
});
194+
});
195+
});

packages/core/src/consumed-thing.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -517,14 +517,15 @@ export default class ConsumedThing extends Thing implements IConsumedThing {
517517
if (options.formIndex >= 0 && options.formIndex < forms.length) {
518518
form = forms[options.formIndex];
519519
const scheme = Helpers.extractScheme(form.href);
520+
const cacheKey = this.getClientCacheKey(scheme, form.subprotocol);
521+
520522
if (this.#servient.hasClientFor(scheme)) {
521523
debug(`ConsumedThing '${this.title}' got client for '${scheme}'`);
522-
client = this.#servient.getClientFor(scheme);
524+
client = this.#servient.getClientFor(scheme, form.subprotocol);
523525

524-
if (!this.#clients.get(scheme)) {
525-
// new client
526+
if (!this.#clients.get(cacheKey)) {
526527
this.ensureClientSecurity(client, form);
527-
this.#clients.set(scheme, client);
528+
this.#clients.set(cacheKey, client);
528529
}
529530
} else {
530531
throw new Error(`ConsumedThing '${this.title}' missing ClientFactory for '${scheme}'`);
@@ -534,35 +535,42 @@ export default class ConsumedThing extends Thing implements IConsumedThing {
534535
}
535536
} else {
536537
const schemes = forms.map((link) => Helpers.extractScheme(link.href));
537-
const cacheIdx = schemes.findIndex((scheme) => this.#clients.has(scheme));
538+
const cacheIdx = schemes.findIndex((scheme, idx) => {
539+
const cacheKey = this.getClientCacheKey(scheme, forms[idx].subprotocol);
540+
return this.#clients.has(cacheKey);
541+
});
538542

539543
if (cacheIdx !== -1) {
540-
// from cache
541544
debug(`ConsumedThing '${this.title}' chose cached client for '${schemes[cacheIdx]}'`);
542-
// if cacheIdx is valid, then clients *contains* schemes[cacheIdx]
543-
client = this.#clients.get(schemes[cacheIdx])!;
545+
const cacheKey = this.getClientCacheKey(schemes[cacheIdx], forms[cacheIdx].subprotocol);
546+
client = this.#clients.get(cacheKey)!;
544547
form = this.findForm(forms, op, affordance, schemes, cacheIdx);
545548
} else {
546-
// new client
547549
debug(`ConsumedThing '${this.title}' has no client in cache (${cacheIdx})`);
548550
const srvIdx = schemes.findIndex((scheme) => this.#servient.hasClientFor(scheme));
549551

550552
if (srvIdx === -1)
551553
throw new Error(`ConsumedThing '${this.title}' missing ClientFactory for '${schemes}'`);
552554

553-
client = this.#servient.getClientFor(schemes[srvIdx]);
555+
form = this.findForm(forms, op, affordance, schemes, srvIdx);
556+
client = this.#servient.getClientFor(schemes[srvIdx], form?.subprotocol);
554557

555558
debug(`ConsumedThing '${this.title}' got new client for '${schemes[srvIdx]}'`);
556559

557-
this.#clients.set(schemes[srvIdx], client);
560+
const cacheKey = this.getClientCacheKey(schemes[srvIdx], form?.subprotocol);
561+
this.#clients.set(cacheKey, client);
558562

559-
form = this.findForm(forms, op, affordance, schemes, srvIdx);
560563
this.ensureClientSecurity(client, form);
561564
}
562565
}
563566
return { client, form };
564567
}
565568

569+
private getClientCacheKey(scheme: string, subprotocol?: string): string {
570+
const normalizedSubprotocol = subprotocol?.trim().toLowerCase();
571+
return normalizedSubprotocol ? `${scheme}+${normalizedSubprotocol}` : scheme;
572+
}
573+
566574
async readProperty(propertyName: string, options?: WoT.InteractionOptions): Promise<WoT.InteractionOutput> {
567575
// TODO pass expected form op to getClientFor()
568576
const tp = this.properties[propertyName];

packages/core/src/helpers.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,27 @@ export default class Helpers implements Resolver {
6060
private static staticAddress?: string = undefined;
6161

6262
public static extractScheme(uri: string): string {
63+
if (!uri || typeof uri !== "string" || uri.trim().length === 0) {
64+
throw new Error(`URI must be a non-empty string`);
65+
}
66+
67+
// handle composite schemes before URL parsing (e.g., mqtt+ws, coap+ws)
68+
const compositeMatch = uri.match(/^([a-z][a-z0-9+.-]*):\/\//);
69+
if (compositeMatch) {
70+
const potentialScheme = compositeMatch[1].toLowerCase();
71+
if (potentialScheme.includes("+")) {
72+
// validate composite scheme format (no leading/trailing +, must have parts on both sides)
73+
const parts = potentialScheme.split("+");
74+
if (parts.length !== 2 || parts[0].length === 0 || parts[1].length === 0) {
75+
throw new Error(`Invalid composite scheme format in URI "${uri}"`);
76+
}
77+
debug(`Helpers found composite scheme '${potentialScheme}'`);
78+
return potentialScheme;
79+
}
80+
}
81+
6382
const parsed = new URL(uri);
6483
debug(parsed);
65-
// remove trailing ':'
6684
if (parsed.protocol === null) {
6785
throw new Error(`Protocol in url "${uri}" must be valid`);
6886
}

0 commit comments

Comments
 (0)