1111#include " EventBroadcaster.h"
1212#include " EventBroadcasterEditor.h"
1313
14- std::shared_ptr<void > EventBroadcaster::getZMQContext () {
15- // Note: C++11 guarantees that initialization of static local variables occurs exactly once, even
16- // if multiple threads attempt to initialize the same static local variable concurrently.
14+ EventBroadcaster::ZMQContext* EventBroadcaster::sharedContext = nullptr ;
15+ CriticalSection EventBroadcaster::sharedContextLock{};
16+
17+ EventBroadcaster::ZMQContext::ZMQContext (const ScopedLock& lock)
1718#ifdef ZEROMQ
18- static const std::shared_ptr<void > ctx (zmq_ctx_new (), zmq_ctx_destroy);
19- #else
20- static const std::shared_ptr<void > ctx;
19+ : context(zmq_ctx_new())
20+ #endif
21+ {
22+ sharedContext = this ;
23+ }
24+
25+ // ZMQContext is a ReferenceCountedObject with a pointer in each instance's
26+ // socket pointer, so this only happens when the last instance is destroyed.
27+ EventBroadcaster::ZMQContext::~ZMQContext ()
28+ {
29+ ScopedLock lock (sharedContextLock);
30+ sharedContext = nullptr ;
31+ #ifdef ZEROMQ
32+ zmq_ctx_destroy (context);
33+ #endif
34+ }
35+
36+ void * EventBroadcaster::ZMQContext::createZMQSocket ()
37+ {
38+ #ifdef ZEROMQ
39+ jassert (context != nullptr );
40+ return zmq_socket (context, ZMQ_PUB);
41+ #endif
42+ }
43+
44+ EventBroadcaster::ZMQSocketPtr::ZMQSocketPtr ()
45+ : std::unique_ptr<void, decltype(&closeZMQSocket)>(nullptr , &closeZMQSocket)
46+ {
47+ ScopedLock lock (sharedContextLock);
48+ if (sharedContext == nullptr )
49+ {
50+ // first one, create the context
51+ context = new ZMQContext (lock);
52+ }
53+ else
54+ {
55+ // use already-created context
56+ context = sharedContext;
57+ }
58+
59+ #ifdef ZEROMQ
60+ reset (context->createZMQSocket ());
61+ #endif
62+ }
63+
64+ EventBroadcaster::ZMQSocketPtr::~ZMQSocketPtr ()
65+ {
66+ // close the socket before the context might get destroyed.
67+ reset (nullptr );
68+ }
69+
70+ int EventBroadcaster::unbindZMQSocket ()
71+ {
72+ #ifdef ZEROMQ
73+ void * socket = zmqSocket.get ();
74+ if (socket != nullptr && listeningPort != 0 )
75+ {
76+ return zmq_unbind (socket, getEndpoint (listeningPort).toRawUTF8 ());
77+ }
2178#endif
22- return ctx ;
79+ return 0 ;
2380}
2481
82+ int EventBroadcaster::rebindZMQSocket ()
83+ {
84+ #ifdef ZEROMQ
85+ void * socket = zmqSocket.get ();
86+ if (socket != nullptr && listeningPort != 0 )
87+ {
88+ return zmq_bind (socket, getEndpoint (listeningPort).toRawUTF8 ());
89+ }
90+ #endif
91+ return 0 ;
92+ }
2593
2694void EventBroadcaster::closeZMQSocket (void * socket)
2795{
@@ -30,16 +98,33 @@ void EventBroadcaster::closeZMQSocket(void* socket)
3098#endif
3199}
32100
101+ String EventBroadcaster::getEndpoint (int port)
102+ {
103+ return String (" tcp://*:" ) + String (port);
104+ }
105+
106+ void EventBroadcaster::reportActualListeningPort (int port)
107+ {
108+ listeningPort = port;
109+ auto editor = static_cast <EventBroadcasterEditor*>(getEditor ());
110+ if (editor)
111+ {
112+ editor->setDisplayedPort (port);
113+ }
114+ }
33115
34116EventBroadcaster::EventBroadcaster ()
35117 : GenericProcessor (" Event Broadcaster" )
36- , zmqContext (getZMQContext())
37- , zmqSocket (nullptr , &closeZMQSocket)
38118 , listeningPort (0 )
39119{
40120 setProcessorType (PROCESSOR_TYPE_SINK);
41121
42- setListeningPort (5557 );
122+ int portToTry = 5557 ;
123+ while (setListeningPort (portToTry) == EADDRINUSE)
124+ {
125+ // try the next port, looking for one not in use
126+ portToTry++;
127+ }
43128}
44129
45130
@@ -56,27 +141,53 @@ int EventBroadcaster::getListeningPort() const
56141}
57142
58143
59- void EventBroadcaster::setListeningPort (int port, bool forceRestart)
144+ int EventBroadcaster::setListeningPort (int port, bool forceRestart)
60145{
61146 if ((listeningPort != port) || forceRestart)
62147 {
63148#ifdef ZEROMQ
64- zmqSocket.reset (zmq_socket (zmqContext.get (), ZMQ_PUB));
65- if (!zmqSocket)
149+ // unbind current socket (if any) to free up port
150+ unbindZMQSocket ();
151+ ZMQSocketPtr newSocket;
152+ auto editor = static_cast <EventBroadcasterEditor*>(getEditor ());
153+ int status = 0 ;
154+
155+ if (!newSocket.get ())
66156 {
67- std::cout << " Failed to create socket: " << zmq_strerror (zmq_errno ()) << std::endl;
68- return ;
157+ status = zmq_errno ();
158+ std::cout << " Failed to create socket: " << zmq_strerror (status) << std::endl;
159+ }
160+ else
161+ {
162+ if (0 != zmq_bind (newSocket.get (), getEndpoint (port).toRawUTF8 ()))
163+ {
164+ status = zmq_errno ();
165+ std::cout << " Failed to open socket: " << zmq_strerror (status) << std::endl;
166+ }
167+ else
168+ {
169+ // success
170+ zmqSocket.swap (newSocket);
171+ reportActualListeningPort (port);
172+ return status;
173+ }
69174 }
70175
71- String url = String ( " tcp://*: " ) + String ( port);
72- if (0 != zmq_bind (zmqSocket. get (), url. toRawUTF8 () ))
176+ // failure, try to rebind current socket to previous port
177+ if (0 == rebindZMQSocket ( ))
73178 {
74- std::cout << " Failed to open socket: " << zmq_strerror (zmq_errno ()) << std::endl;
75- return ;
179+ reportActualListeningPort (listeningPort);
76180 }
77- #endif
181+ else
182+ {
183+ reportActualListeningPort (0 );
184+ }
185+ return status;
78186
79- listeningPort = port;
187+ #else
188+ reportActualListeningPort (port);
189+ return 0 ;
190+ #endif
80191 }
81192}
82193
0 commit comments