Skip to content

Commit 53400a6

Browse files
feat: Enhance WebSocket handling with improved ping/pong logic and session management
Signed-off-by: Mario Serrano <mario@dynamiasoluciones.com>
1 parent afa1ab1 commit 53400a6

3 files changed

Lines changed: 113 additions & 39 deletions

File tree

zk/src/main/java/tools/dynamia/zk/websocket/WebSocketGlobalCommandHandler.java

Lines changed: 76 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.zkoss.zk.ui.Desktop;
2525
import tools.dynamia.commons.logger.Loggable;
2626

27-
import java.util.Collection;
27+
import java.io.IOException;
2828
import java.util.Map;
2929
import java.util.concurrent.ConcurrentHashMap;
3030

@@ -62,8 +62,8 @@
6262
*/
6363
public class WebSocketGlobalCommandHandler extends TextWebSocketHandler implements Loggable {
6464

65-
private final Map<String, String> desktops = new ConcurrentHashMap<>();
66-
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
65+
// Map of active WebSocket sessions by session ID
66+
private final Map<String, DeskstopWebSocketSession> sessions = new ConcurrentHashMap<>();
6767

6868
/**
6969
* Handles incoming text messages from WebSocket clients.
@@ -91,17 +91,18 @@ protected void handleTextMessage(WebSocketSession session, TextMessage message)
9191
return;
9292
}
9393

94+
if ("PONG".equals(payload)) {
95+
// Ignore PONG messages
96+
return;
97+
}
98+
9499
// Handle desktop ID registration
95-
String desktopId = payload;
96-
String oldSessionId = desktops.get(desktopId);
97-
if (oldSessionId != null) {
98-
WebSocketSession oldSession = sessions.get(oldSessionId);
99-
if (oldSession != null) {
100-
oldSession.close(CloseStatus.NORMAL);
101-
}
100+
var oldSession = sessions.get(payload);
101+
if (oldSession != null) {
102+
oldSession.close(CloseStatus.NORMAL);
102103
}
103-
log("Associating desktop " + desktopId + " with ws session " + session.getId());
104-
desktops.put(desktopId, session.getId());
104+
log("Associating desktop " + payload + " with ws session " + session.getId());
105+
sessions.put(session.getId(), new DeskstopWebSocketSession(payload, session));
105106
}
106107

107108
/**
@@ -114,8 +115,7 @@ protected void handleTextMessage(WebSocketSession session, TextMessage message)
114115
*/
115116
@Override
116117
public void afterConnectionEstablished(WebSocketSession session) {
117-
log("WebSocket connection established: " + session.getId());
118-
sessions.put(session.getId(), session);
118+
log("New webSocket connection established: " + session.getId() + " waiting for desktop ID...");
119119
}
120120

121121
/**
@@ -125,16 +125,11 @@ public void afterConnectionEstablished(WebSocketSession session) {
125125
* the internal maps. This ensures that closed connections don't accumulate in memory.</p>
126126
*
127127
* @param session the closed WebSocket session
128-
* @param status the status code indicating why the connection was closed
128+
* @param status the status code indicating why the connection was closed
129129
*/
130130
@Override
131131
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
132132
log("WebSocket connection closed: " + session.getId() + " with status " + status);
133-
134-
String desktopId = desktops.entrySet().stream().filter(e -> e.getValue().equals(session.getId())).map(Map.Entry::getKey).findFirst().orElse(null);
135-
if (desktopId != null) {
136-
desktops.remove(desktopId);
137-
}
138133
sessions.remove(session.getId());
139134
}
140135

@@ -147,15 +142,11 @@ public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
147142
* @param desktop the ZK Desktop to find the session for
148143
* @return the associated WebSocket session, or {@code null} if not found or desktop is null
149144
*/
150-
WebSocketSession findSession(Desktop desktop) {
151-
WebSocketSession session = null;
152-
if (desktop != null && desktop.getId() != null) {
153-
String sessionID = desktops.get(desktop.getId());
154-
if (sessionID != null) {
155-
session = sessions.get(sessionID);
156-
}
157-
}
158-
return session;
145+
public DeskstopWebSocketSession findSession(Desktop desktop) {
146+
return sessions.values().stream()
147+
.filter(ds -> ds.matchesDesktop(desktop))
148+
.findFirst()
149+
.orElse(null);
159150
}
160151

161152
/**
@@ -166,7 +157,61 @@ WebSocketSession findSession(Desktop desktop) {
166157
*
167158
* @return a collection of all active WebSocket sessions
168159
*/
169-
Collection<WebSocketSession> getAllSessions() {
170-
return sessions.values();
160+
public Map<String, DeskstopWebSocketSession> getSessions() {
161+
return sessions;
162+
}
163+
164+
/**
165+
* Closes the WebSocket session associated with a specific ZK Desktop.
166+
*
167+
* <p>This method safely closes the session and removes it from the internal
168+
* session map to prevent resource leaks.</p>
169+
*
170+
* @param desktop the ZK Desktop whose session should be closed
171+
*/
172+
public void closeSession(Desktop desktop) {
173+
var session = findSession(desktop);
174+
if (session != null) {
175+
try {
176+
session.close(CloseStatus.NORMAL);
177+
} finally {
178+
sessions.remove(session.session.getId());
179+
}
180+
} else {
181+
log("No websocket session found for desktop " + desktop.getId());
182+
}
183+
}
184+
185+
186+
/**
187+
* Represents a WebSocket session associated with a specific ZK Desktop.
188+
*
189+
* <p>This record encapsulates the desktop ID and the WebSocket session,
190+
* providing utility methods for session management and message sending.</p>
191+
*/
192+
public record DeskstopWebSocketSession(String desktopId, WebSocketSession session) {
193+
boolean isOpen() {
194+
return session.isOpen();
195+
}
196+
197+
boolean matchesDesktop(Desktop desktop) {
198+
return desktop != null && desktopId.equals(desktop.getId());
199+
}
200+
201+
boolean matchesSession(WebSocketSession otherSession) {
202+
return session.getId().equals(otherSession.getId());
203+
}
204+
205+
void close(CloseStatus closeStatus) {
206+
try {
207+
session.close(closeStatus);
208+
} catch (Exception e) {
209+
//ignore
210+
}
211+
}
212+
213+
void sendMessage(String message) throws IOException {
214+
session.sendMessage(new TextMessage(message));
215+
}
171216
}
172217
}

zk/src/main/java/tools/dynamia/zk/websocket/WebSocketPushSender.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package tools.dynamia.zk.websocket;
1919

20+
import org.springframework.web.socket.CloseStatus;
2021
import org.springframework.web.socket.TextMessage;
2122
import org.springframework.web.socket.WebSocketSession;
2223
import org.zkoss.zk.ui.Desktop;
@@ -31,6 +32,7 @@
3132
import java.io.IOException;
3233
import java.util.HashMap;
3334
import java.util.Map;
35+
import java.util.concurrent.atomic.AtomicInteger;
3436

3537
/**
3638
* Helper class to send push command to desktop client using web socket
@@ -66,21 +68,23 @@ public static boolean sendPushCommand(Desktop desktop, String command, Map<Strin
6668
data.putAll(payload);
6769
}
6870
data.put("command", command);
69-
WebSocketSession session = handler.findSession(desktop);
71+
var session = handler.findSession(desktop);
7072
if (session != null) {
7173
String textData = StringPojoParser.convertMapToJson(data);
72-
session.sendMessage(new TextMessage(textData));
74+
session.sendMessage(textData);
7375
return true;
7476
} else {
7577
LOGGER.warn("No websocket session found for desktop " + desktop);
7678
}
79+
} catch (IOException e) {
80+
LOGGER.error("IO Error sending push command '" + command + "' to Dekstop: " + desktop, e);
81+
handler.closeSession(desktop);
7782
} catch (Exception e) {
7883
LOGGER.error("Error sending push command '" + command + "' to Dekstop: " + desktop, e);
7984
}
8085
} else {
8186
LOGGER.warn("No websocket handler found");
8287
}
83-
8488
return false;
8589
}
8690

@@ -101,17 +105,31 @@ public static boolean sendPushCommand(String command) {
101105
public static void broadcastCommand(String command) {
102106
WebSocketGlobalCommandHandler handler = getHandler();
103107
if (handler != null) {
104-
handler.getAllSessions().forEach(s -> {
108+
AtomicInteger count = new AtomicInteger();
109+
handler.getSessions().values().forEach(s -> {
105110
try {
106-
s.sendMessage(new TextMessage(command));
111+
s.sendMessage(command);
112+
count.incrementAndGet();
107113
} catch (IOException e) {
108-
LOGGER.error("Error sending command " + command + " to WS Session: " + s);
114+
LOGGER.error("IO Error sending command " + command + " to WS Session: " + s, e);
115+
s.close(CloseStatus.NORMAL);
116+
} catch (Exception e) {
117+
LOGGER.error("Error sending command " + command + " to WS Session: " + s, e);
109118
}
110119
});
120+
LOGGER.info("Broadcasted command '" + command + "' to " + count.get() + " WS sessions.");
111121
}
112122
}
113123

114-
private static WebSocketGlobalCommandHandler getHandler() {
124+
/**
125+
* Broadcast a heartbeat PING command to all connected sessions
126+
*/
127+
public static void broadcastHeartbeat() {
128+
broadcastCommand("PING");
129+
}
130+
131+
132+
public static WebSocketGlobalCommandHandler getHandler() {
115133
return Containers.get().findObject(WebSocketGlobalCommandHandler.class);
116134
}
117135

zk/src/main/resources/static/dynamia-tools/js/dynamia-tools-ws.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
// Keep-alive configuration
119119
var keepAliveConfig = {
120120
enabled: true,
121-
interval: 30000 // Send ping every 30 seconds
121+
interval: 60000 // Send ping every 60 seconds
122122
};
123123

124124
var wsManager = {
@@ -301,6 +301,17 @@
301301
return;
302302
}
303303

304+
if (e.data === 'PING') {
305+
// Respond to server ping
306+
try {
307+
wsManager.socket.send('PONG');
308+
console.debug('DynamiaTools WebSocket: pong sent in response to server ping');
309+
} catch (error) {
310+
console.warn('DynamiaTools WebSocket: error sending pong', error);
311+
}
312+
return;
313+
}
314+
304315
console.debug('DynamiaTools command received:', e.data);
305316

306317
try {

0 commit comments

Comments
 (0)