Skip to content

Commit 2fa13ef

Browse files
author
Josh Thomas
committed
Feat: Adding websocket implementation"
Signed-off-by: Josh Thomas <jthomas@ignite-retail.com>
1 parent 6a8a843 commit 2fa13ef

2 files changed

Lines changed: 319 additions & 52 deletions

File tree

packages/binding-websockets/src/ws-client.ts

Lines changed: 93 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ export default class WebSocketClient implements ProtocolClient {
6565
private subscriptions: Map<string, Set<SubscriptionHandlers>> = new Map();
6666
private credentials: Map<string, StoredCredentials> = new Map();
6767
private protocolMode: Map<string, ProtocolMode> = new Map();
68+
private subscriptionTypes: Map<string, "property" | "event"> = new Map();
6869
private isStarted = false;
6970

7071
// Configuration
@@ -83,7 +84,7 @@ export default class WebSocketClient implements ProtocolClient {
8384

8485
const ws = await this.getOrCreateConnection(form);
8586
const baseUrl = this.extractBaseUrl(form.href);
86-
const mode = this.protocolMode.get(baseUrl) ?? "generic";
87+
const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic";
8788

8889
let response: unknown;
8990

@@ -120,7 +121,7 @@ export default class WebSocketClient implements ProtocolClient {
120121

121122
const ws = await this.getOrCreateConnection(form);
122123
const baseUrl = this.extractBaseUrl(form.href);
123-
const mode = this.protocolMode.get(baseUrl) ?? "generic";
124+
const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic";
124125

125126
// Parse content body
126127
const buffer = await content.toBuffer();
@@ -149,7 +150,7 @@ export default class WebSocketClient implements ProtocolClient {
149150

150151
const ws = await this.getOrCreateConnection(form);
151152
const baseUrl = this.extractBaseUrl(form.href);
152-
const mode = this.protocolMode.get(baseUrl) ?? "generic";
153+
const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic";
153154

154155
// Parse input parameters if provided
155156
let inputData: unknown;
@@ -195,21 +196,26 @@ export default class WebSocketClient implements ProtocolClient {
195196
const baseUrl = this.extractBaseUrl(form.href);
196197
const resourceKey = `${baseUrl}:${this.extractResourceName(form.href)}`;
197198

199+
// Get subscription type to send correct unsubscribe verb
200+
const subscriptionType = this.subscriptionTypes.get(resourceKey);
201+
198202
// Remove subscription handlers
199203
this.subscriptions.delete(resourceKey);
204+
this.subscriptionTypes.delete(resourceKey);
200205

201206
const ws = await this.getOrCreateConnection(form);
202-
const mode = this.protocolMode.get(baseUrl) ?? "generic";
207+
const mode = this.protocolMode.get(form.href) ?? this.protocolMode.get(baseUrl) ?? "generic";
203208

204-
if (mode === "wot") {
205-
// Send unsubscribe request
209+
if (mode === "wot" && subscriptionType != null) {
210+
// Send unsubscribe request with correct WoT verb
206211
const thingId = this.extractThingId(form.href);
207212
const resourceName = this.extractResourceName(form.href);
213+
const operation = subscriptionType === "event" ? "unsubscribeevent" : "unsubscribeproperty";
208214
const request = {
209215
messageType: "request",
210216
thingID: thingId,
211217
messageID: this.generateMessageId(),
212-
operation: "unsubscribe",
218+
operation: operation,
213219
name: resourceName,
214220
};
215221

@@ -245,6 +251,10 @@ export default class WebSocketClient implements ProtocolClient {
245251

246252
// Determine if this is an event or property subscription
247253
const isEvent = form.op?.includes("subscribeevent") ?? form.op === "subscribeevent";
254+
const subscriptionType: "property" | "event" = isEvent ? "event" : "property";
255+
256+
// Store subscription type for later unsubscribe
257+
this.subscriptionTypes.set(resourceKey, subscriptionType);
248258

249259
if (mode === "wot") {
250260
// Send W3C Web Thing Protocol subscribe request
@@ -263,6 +273,7 @@ export default class WebSocketClient implements ProtocolClient {
263273
} catch (err) {
264274
// Remove handler if subscription failed
265275
this.subscriptions.get(resourceKey)?.delete(handlers);
276+
this.subscriptionTypes.delete(resourceKey);
266277
throw err;
267278
}
268279
}
@@ -500,18 +511,20 @@ export default class WebSocketClient implements ProtocolClient {
500511
* Get or create WebSocket connection for the given form
501512
*/
502513
private async getOrCreateConnection(form: Form): Promise<WebSocket> {
514+
// Use full href as connection key to support multiple endpoints on same host
515+
const connectionKey = form.href;
503516
const baseUrl = this.extractBaseUrl(form.href);
504517

505518
// Check if connection already exists and is open
506-
const existing = this.connections.get(baseUrl);
519+
const existing = this.connections.get(connectionKey);
507520
if (existing != null && existing.readyState === WebSocket.OPEN) {
508521
return existing;
509522
}
510523

511524
// Detect protocol mode
512525
const mode = this.detectProtocolMode(form);
513-
this.protocolMode.set(baseUrl, mode);
514-
debug(`Using protocol mode '${mode}' for ${baseUrl}`);
526+
this.protocolMode.set(connectionKey, mode);
527+
debug(`Using protocol mode '${mode}' for ${connectionKey}`);
515528

516529
// Create new WebSocket connection
517530
return new Promise((resolve, reject) => {
@@ -536,60 +549,63 @@ export default class WebSocketClient implements ProtocolClient {
536549
const subprotocol = this.extractSubprotocol(form);
537550
const protocols = subprotocol ? [subprotocol] : undefined;
538551

539-
debug(`Creating WebSocket connection to ${baseUrl}${protocols ? ` with subprotocol ${subprotocol}` : ""}`);
552+
debug(`Creating WebSocket connection to ${form.href}${protocols ? ` with subprotocol ${subprotocol}` : ""}`);
540553

541-
const ws = new WebSocket(baseUrl, protocols, wsOptions);
554+
// Connect to the full href, not just baseUrl
555+
const ws = new WebSocket(form.href, protocols, wsOptions);
542556

543557
ws.on("open", () => {
544-
info(`WebSocket connection established to ${baseUrl}`);
545-
this.connections.set(baseUrl, ws);
558+
info(`WebSocket connection established to ${connectionKey}`);
559+
this.connections.set(connectionKey, ws);
546560
resolve(ws);
547561
});
548562

549563
ws.on("message", (data: WebSocket.Data) => {
550-
this.handleWebSocketMessage(baseUrl, data);
564+
this.handleWebSocketMessage(connectionKey, data);
551565
});
552566

553567
ws.on("error", (err: Error) => {
554-
error(`WebSocket error for ${baseUrl}: ${err.message}`);
568+
error(`WebSocket error for ${connectionKey}: ${err.message}`);
555569
// Reject pending requests
556570
for (const [messageId, handler] of this.pendingRequests.entries()) {
557571
handler.reject(err);
558572
clearTimeout(handler.timeoutId);
559573
this.pendingRequests.delete(messageId);
560574
}
561-
// Notify subscriptions
562-
const subs = this.subscriptions.get(baseUrl);
563-
if (subs != null) {
564-
subs.forEach((handlers) => {
565-
if (handlers.error != null) {
566-
handlers.error(err);
567-
}
568-
});
575+
// Notify subscriptions based on baseUrl prefix
576+
for (const [key, subs] of this.subscriptions.entries()) {
577+
if (key.startsWith(baseUrl + ":")) {
578+
subs.forEach((handlers) => {
579+
if (handlers.error != null) {
580+
handlers.error(err);
581+
}
582+
});
583+
}
569584
}
570585
reject(err);
571586
});
572587

573588
ws.on("close", (code: number, reason: string) => {
574-
info(`WebSocket connection closed for ${baseUrl}: ${code} ${reason}`);
575-
this.connections.delete(baseUrl);
576-
// Complete subscriptions
577-
const subs = this.subscriptions.get(baseUrl);
578-
if (subs != null) {
579-
subs.forEach((handlers) => {
580-
if (handlers.complete != null) {
581-
handlers.complete();
582-
}
583-
});
584-
this.subscriptions.delete(baseUrl);
589+
info(`WebSocket connection closed for ${connectionKey}: ${code} ${reason}`);
590+
this.connections.delete(connectionKey);
591+
// Complete subscriptions based on baseUrl prefix
592+
for (const [key, subs] of this.subscriptions.entries()) {
593+
if (key.startsWith(baseUrl + ":")) {
594+
subs.forEach((handlers) => {
595+
if (handlers.complete != null) {
596+
handlers.complete();
597+
}
598+
});
599+
this.subscriptions.delete(key);
600+
}
585601
}
586602
});
587603

588604
// Connection timeout
589605
setTimeout(() => {
590606
if (ws.readyState === WebSocket.CONNECTING) {
591607
ws.close();
592-
reject(new Error(`WebSocket connection timeout for ${baseUrl}`));
608+
reject(new Error(`WebSocket connection timeout for ${connectionKey}`));
593609
}
594610
}, this.defaultTimeout);
595611
});
@@ -598,10 +614,12 @@ export default class WebSocketClient implements ProtocolClient {
598614
/**
599615
* Handle incoming WebSocket message
600616
*/
601-
private handleWebSocketMessage(baseUrl: string, data: WebSocket.Data): void {
617+
private handleWebSocketMessage(connectionKey: string, data: WebSocket.Data): void {
602618
try {
603619
const message = JSON.parse(data.toString());
604-
const mode = this.protocolMode.get(baseUrl) ?? "generic";
620+
const mode = this.protocolMode.get(connectionKey) ?? "generic";
621+
// Extract baseUrl from connectionKey for subscription lookup
622+
const baseUrl = this.extractBaseUrl(connectionKey);
605623

606624
if (mode === "wot") {
607625
this.handleWoTMessage(baseUrl, message);
@@ -674,20 +692,44 @@ export default class WebSocketClient implements ProtocolClient {
674692
}
675693

676694
// If no correlation found, might be a subscription update
677-
// Notify all subscriptions for this base URL
678-
const subs = this.subscriptions.get(baseUrl);
679-
if (subs != null) {
680-
const content = new Content(
681-
"application/json",
682-
this.bufferToStream(Buffer.from(JSON.stringify(message)))
683-
);
684-
subs.forEach((handlers) => {
685-
try {
686-
handlers.next(content);
687-
} catch (err) {
688-
error(`Error in subscription handler: ${err}`);
695+
// For generic websockets, try to extract resource name from message
696+
// or dispatch to all subscriptions for this baseUrl
697+
const resourceName = message.resource as string | undefined;
698+
699+
if (resourceName) {
700+
// If message has resource name, use specific key
701+
const resourceKey = `${baseUrl}:${resourceName}`;
702+
const subs = this.subscriptions.get(resourceKey);
703+
if (subs != null) {
704+
const content = new Content(
705+
"application/json",
706+
this.bufferToStream(Buffer.from(JSON.stringify(message)))
707+
);
708+
subs.forEach((handlers) => {
709+
try {
710+
handlers.next(content);
711+
} catch (err) {
712+
error(`Error in subscription handler: ${err}`);
713+
}
714+
});
715+
}
716+
} else {
717+
// Otherwise, broadcast to all subscriptions for this baseUrl
718+
for (const [key, subs] of this.subscriptions.entries()) {
719+
if (key.startsWith(baseUrl + ":")) {
720+
const content = new Content(
721+
"application/json",
722+
this.bufferToStream(Buffer.from(JSON.stringify(message)))
723+
);
724+
subs.forEach((handlers) => {
725+
try {
726+
handlers.next(content);
727+
} catch (err) {
728+
error(`Error in subscription handler: ${err}`);
729+
}
730+
});
689731
}
690-
});
732+
}
691733
}
692734
}
693735

0 commit comments

Comments
 (0)