@@ -22,6 +22,29 @@ std::shared_ptr<void> EventBroadcaster::getZMQContext() {
2222 return ctx;
2323}
2424
25+ int EventBroadcaster::unbindZMQSocket ()
26+ {
27+ #ifdef ZEROMQ
28+ void * socket = zmqSocket.get ();
29+ if (socket != nullptr && listeningPort != 0 )
30+ {
31+ return zmq_unbind (socket, getEndpoint (listeningPort).toRawUTF8 ());
32+ }
33+ #endif
34+ return 0 ;
35+ }
36+
37+ int EventBroadcaster::rebindZMQSocket ()
38+ {
39+ #ifdef ZEROMQ
40+ void * socket = zmqSocket.get ();
41+ if (socket != nullptr && listeningPort != 0 )
42+ {
43+ return zmq_bind (socket, getEndpoint (listeningPort).toRawUTF8 ());
44+ }
45+ #endif
46+ return 0 ;
47+ }
2548
2649void EventBroadcaster::closeZMQSocket (void * socket)
2750{
@@ -30,16 +53,35 @@ void EventBroadcaster::closeZMQSocket(void* socket)
3053#endif
3154}
3255
56+ String EventBroadcaster::getEndpoint (int port)
57+ {
58+ return String (" tcp://*:" ) + String (port);
59+ }
60+
61+ void EventBroadcaster::reportActualListeningPort (int port)
62+ {
63+ listeningPort = port;
64+ auto editor = static_cast <EventBroadcasterEditor*>(getEditor ());
65+ if (editor)
66+ {
67+ editor->setDisplayedPort (port);
68+ }
69+ }
3370
3471EventBroadcaster::EventBroadcaster ()
3572 : GenericProcessor (" Event Broadcaster" )
3673 , zmqContext (getZMQContext())
37- , zmqSocket (nullptr , &closeZMQSocket )
74+ , zmqSocket (nullptr )
3875 , listeningPort (0 )
3976{
4077 setProcessorType (PROCESSOR_TYPE_SINK);
4178
42- setListeningPort (5557 );
79+ int portToTry = 5557 ;
80+ while (setListeningPort (portToTry) == EADDRINUSE)
81+ {
82+ // try the next port, looking for one not in use
83+ portToTry++;
84+ }
4385}
4486
4587
@@ -56,27 +98,53 @@ int EventBroadcaster::getListeningPort() const
5698}
5799
58100
59- void EventBroadcaster::setListeningPort (int port, bool forceRestart)
101+ int EventBroadcaster::setListeningPort (int port, bool forceRestart)
60102{
61103 if ((listeningPort != port) || forceRestart)
62104 {
63105#ifdef ZEROMQ
64- zmqSocket.reset (zmq_socket (zmqContext.get (), ZMQ_PUB));
65- if (!zmqSocket)
106+ // unbind current socket (if any) to free up port
107+ unbindZMQSocket ();
108+ zmqSocketPtr newSocket (zmq_socket (zmqContext.get (), ZMQ_PUB));
109+ auto editor = static_cast <EventBroadcasterEditor*>(getEditor ());
110+ int status = 0 ;
111+
112+ if (!newSocket.get ())
66113 {
67- std::cout << " Failed to create socket: " << zmq_strerror (zmq_errno ()) << std::endl;
68- return ;
114+ status = zmq_errno ();
115+ std::cout << " Failed to create socket: " << zmq_strerror (status) << std::endl;
116+ }
117+ else
118+ {
119+ if (0 != zmq_bind (newSocket.get (), getEndpoint (port).toRawUTF8 ()))
120+ {
121+ status = zmq_errno ();
122+ std::cout << " Failed to open socket: " << zmq_strerror (status) << std::endl;
123+ }
124+ else
125+ {
126+ // success
127+ zmqSocket.swap (newSocket);
128+ reportActualListeningPort (port);
129+ return status;
130+ }
69131 }
70132
71- String url = String ( " tcp://*: " ) + String ( port);
72- if (0 != zmq_bind (zmqSocket. get (), url. toRawUTF8 () ))
133+ // failure, try to rebind current socket to previous port
134+ if (0 == rebindZMQSocket ( ))
73135 {
74- std::cout << " Failed to open socket: " << zmq_strerror (zmq_errno ()) << std::endl;
75- return ;
136+ reportActualListeningPort (listeningPort);
76137 }
77- #endif
138+ else
139+ {
140+ reportActualListeningPort (0 );
141+ }
142+ return status;
78143
79- listeningPort = port;
144+ #else
145+ reportActualListeningPort (port);
146+ return 0 ;
147+ #endif
80148 }
81149}
82150
0 commit comments