Skip to content

Commit cf62f55

Browse files
committed
Manage socket entirely in thread, control w/ atomic flags
1 parent c0e1406 commit cf62f55

3 files changed

Lines changed: 97 additions & 121 deletions

File tree

Source/Plugins/NetworkEvents/NetworkEvents.cpp

Lines changed: 75 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -41,37 +41,23 @@ CriticalSection NetworkEvents::sharedContextLock{};
4141
NetworkEvents::NetworkEvents()
4242
: GenericProcessor ("Network Events")
4343
, Thread ("NetworkThread")
44-
, restart (false)
45-
, changeResponder (false)
46-
, urlport (0)
44+
, makeNewSocket (true)
45+
, requestedPort (5556)
46+
, boundPort (0)
4747
{
4848
setProcessorType (PROCESSOR_TYPE_SOURCE);
4949

50-
if (!setNewListeningPort(5556))
51-
{
52-
// resort to choosing a port automatically
53-
setNewListeningPort(0);
54-
}
55-
5650
startThread();
5751

5852
sendSampleCount = false; // disable updating the continuous buffer sample counts,
5953
// since this processor only sends events
6054
}
6155

6256

63-
bool NetworkEvents::setNewListeningPort(uint16 port)
57+
void NetworkEvents::setNewListeningPort(uint16 port)
6458
{
65-
ScopedPointer<Responder> newResponder = Responder::makeResponder(port);
66-
if (newResponder == nullptr)
67-
{
68-
return false;
69-
}
70-
71-
ScopedLock nrLock(nextResponderLock);
72-
nextResponder = newResponder;
73-
changeResponder = true;
74-
return true;
59+
requestedPort = port;
60+
makeNewSocket = true;
7561
}
7662

7763

@@ -85,25 +71,15 @@ NetworkEvents::~NetworkEvents()
8571
}
8672

8773

88-
String NetworkEvents::getPortString() const
74+
String NetworkEvents::getCurrPortString() const
8975
{
90-
#ifdef ZEROMQ
91-
uint16 port = urlport;
92-
if (port == 0)
93-
{
94-
return "<no cxn>";
95-
}
96-
97-
return String(port);
98-
#else
99-
return "<no zeromq>";
100-
#endif
76+
return getPortString(boundPort);
10177
}
10278

10379

10480
void NetworkEvents::restartConnection()
10581
{
106-
restart = true;
82+
makeNewSocket = true;
10783
}
10884

10985

@@ -260,51 +236,58 @@ void NetworkEvents::process (AudioSampleBuffer& buffer)
260236
void NetworkEvents::run()
261237
{
262238
#ifdef ZEROMQ
263-
ScopedPointer<Responder> responder;
264-
char buffer[MAX_MESSAGE_LENGTH];
265-
bool connected = false;
239+
HeapBlock<char> buffer(MAX_MESSAGE_LENGTH);
240+
241+
// responder should always be valid (bound to a port) if it is non-null
242+
ScopedPointer<Responder> responder(new Responder(0)); // use any available port as default
243+
if (responder->isValid())
244+
{
245+
boundPort = responder->getBoundPort();
246+
}
247+
else
248+
{
249+
responder = nullptr;
250+
boundPort = 0;
251+
}
252+
253+
// purposely don't call updatePortString - makeNewSocket will be true on startup,
254+
// so wait to try the requested port (5556) before updating the editor.
266255

267256
while (!threadShouldExit())
268257
{
269-
// reopen connection if necessary
270-
if (restart)
258+
// change socket if necessary
259+
while (makeNewSocket.exchange(false))
271260
{
272-
restart = false; // the other thread only sets restart to true, so it's not a race condition
273-
responder = nullptr; // destroy old one, which frees the port
274-
responder = Responder::makeResponder(urlport);
261+
uint16 nextPort = requestedPort; // (maybe the newly entered port on the editor text box)
262+
if (nextPort > 0 && nextPort == boundPort) // i.e. this is a restart
263+
{
264+
responder = nullptr; // destroy old one, which frees the port
265+
boundPort = 0;
266+
}
275267

276-
if (responder != nullptr)
268+
if (nextPort == 0)
277269
{
278-
connected = true;
279-
updatePort(responder->getBoundPort());
270+
CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically");
280271
}
281-
}
282272

283-
// switch to new responder if necessary
284-
if (changeResponder)
285-
{
286-
const ScopedLock nrLock(nextResponderLock);
287-
if (nextResponder != nullptr)
273+
ScopedPointer<Responder> newResponder(new Responder(nextPort));
274+
if (newResponder->isValid())
288275
{
289-
changeResponder = false; // this is also controlled by the nextResponderLock
290-
responder = nextResponder;
291-
connected = true;
292-
updatePort(responder->getBoundPort());
276+
// replace the current socket with the newly created socket
277+
responder = newResponder;
278+
boundPort = responder->getBoundPort();
293279
}
294280
else
295281
{
296-
jassertfalse; // huh? a new responder should be available
282+
newResponder->reportErr("Failed to connect to port " + String(nextPort));
297283
}
284+
285+
updatePortString(boundPort);
298286
}
299287

300288
// if we don't have a vaild (connected) socket, keep looping until we do
301289
if (responder == nullptr)
302290
{
303-
if (connected)
304-
{
305-
connected = false;
306-
updatePort(0);
307-
}
308291
wait(100);
309292
continue;
310293
}
@@ -367,7 +350,9 @@ void NetworkEvents::setEnabledState (bool newState)
367350
void NetworkEvents::saveCustomParametersToXml (XmlElement* parentElement)
368351
{
369352
XmlElement* mainNode = parentElement->createNewChildElement ("NETWORKEVENTS");
370-
mainNode->setAttribute ("port", urlport);
353+
uint16 currBoundPort = boundPort;
354+
// save the actual bound port if any, otherwise the last attempted port.
355+
mainNode->setAttribute ("port", currBoundPort ? currBoundPort : requestedPort.load());
371356
}
372357

373358

@@ -411,14 +396,13 @@ StringPairArray NetworkEvents::parseNetworkMessage(StringRef msg)
411396
}
412397

413398

414-
void NetworkEvents::updatePort(uint16 port)
399+
void NetworkEvents::updatePortString(uint16 port)
415400
{
416-
urlport = port;
417401
auto ed = static_cast<NetworkEventsEditor*>(getEditor());
418402
if (ed)
419403
{
420404
const MessageManagerLock mmLock;
421-
ed->setPortText(getPortString());
405+
ed->setPortText(getPortString(port));
422406
}
423407
}
424408

@@ -429,6 +413,21 @@ String NetworkEvents::getEndpoint(uint16 port)
429413
}
430414

431415

416+
String NetworkEvents::getPortString(uint16 port)
417+
{
418+
#ifdef ZEROMQ
419+
if (port == 0)
420+
{
421+
return "<no cxn>";
422+
}
423+
424+
return String(port);
425+
#else
426+
return "<no zeromq>";
427+
#endif
428+
}
429+
430+
432431
/*** ZMQContext ***/
433432

434433
NetworkEvents::ZMQContext::ZMQContext(const ScopedLock& lock)
@@ -467,9 +466,10 @@ void* NetworkEvents::ZMQContext::createSocket()
467466
const int NetworkEvents::Responder::RECV_TIMEOUT_MS = 100;
468467

469468
NetworkEvents::Responder::Responder(uint16 port)
470-
: socket(nullptr)
471-
, boundPort(0)
472-
, lastErrno(0)
469+
: socket (nullptr)
470+
, valid (false)
471+
, boundPort (0)
472+
, lastErrno (0)
473473
{
474474
{
475475
ScopedLock lock(sharedContextLock);
@@ -510,7 +510,6 @@ NetworkEvents::Responder::Responder(uint16 port)
510510
// if requested port was 0, find out which port was actually used
511511
if (port == 0)
512512
{
513-
CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically");
514513
const size_t BUF_LEN = 32;
515514
size_t len = BUF_LEN;
516515
char endpoint[BUF_LEN];
@@ -521,34 +520,15 @@ NetworkEvents::Responder::Responder(uint16 port)
521520
}
522521

523522
port = String(endpoint).getTrailingIntValue();
524-
jassert(port > 0);
525523
}
526524

525+
jassert(port > 0);
526+
valid = true;
527527
boundPort = port;
528528
#endif
529529
}
530530

531531

532-
ScopedPointer<NetworkEvents::Responder> NetworkEvents::Responder::makeResponder(uint16 port)
533-
{
534-
ScopedPointer<Responder> socketPtr = new Responder(port);
535-
uint16 boundPort = socketPtr->getBoundPort();
536-
if (boundPort == 0)
537-
{
538-
socketPtr->reportErr("Failed to connect to port " + String(port));
539-
return nullptr;
540-
}
541-
542-
if (port != 0 && boundPort != port)
543-
{
544-
jassertfalse; // huh?
545-
return nullptr;
546-
}
547-
return socketPtr;
548-
}
549-
550-
551-
552532
NetworkEvents::Responder::~Responder()
553533
{
554534
#ifdef ZEROMQ
@@ -584,6 +564,12 @@ void NetworkEvents::Responder::reportErr(const String& message) const
584564
};
585565

586566

567+
bool NetworkEvents::Responder::isValid() const
568+
{
569+
return valid;
570+
}
571+
572+
587573
uint16 NetworkEvents::Responder::getBoundPort() const
588574
{
589575
return boundPort;

Source/Plugins/NetworkEvents/NetworkEvents.h

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040
#include <list>
4141
#include <queue>
42-
42+
#include <atomic>
4343

4444
/**
4545
Sends incoming TCP/IP messages from 0MQ to the events buffer
@@ -76,11 +76,10 @@ class NetworkEvents : public GenericProcessor
7676
void run() override;
7777

7878
// passing 0 corresponds to wildcard ("*") and picks any available port
79-
// returns true on success, false on failure
80-
bool setNewListeningPort (uint16 port);
79+
void setNewListeningPort (uint16 port);
8180

8281
// gets a string for the editor's port input to reflect current urlport
83-
String getPortString() const;
82+
String getCurrPortString() const;
8483

8584
void restartConnection();
8685

@@ -110,8 +109,9 @@ class NetworkEvents : public GenericProcessor
110109
class Responder
111110
{
112111
public:
113-
// tries to create a responder and bind to given port; returns nullptr on failure.
114-
static ScopedPointer<Responder> makeResponder(uint16 port);
112+
// creates socket from given context and tries to bind to port.
113+
// if port is 0, chooses an available ephemeral port.
114+
Responder(uint16 port);
115115
~Responder();
116116

117117
// returns the latest errno value
@@ -120,7 +120,9 @@ class NetworkEvents : public GenericProcessor
120120
// output last error on stdout and status bar, including the passed message
121121
void reportErr(const String& message) const;
122122

123-
// returns the port if the socket was successfully bound to one.
123+
bool isValid() const;
124+
125+
// returns the port if the socket was successfully bound to one, else 0
124126
// if not, or if the socket is invalid, returns 0.
125127
uint16 getBoundPort() const;
126128

@@ -132,13 +134,10 @@ class NetworkEvents : public GenericProcessor
132134
int send(StringRef response);
133135

134136
private:
135-
// creates socket from given context and tries to bind to port.
136-
// if port is 0, chooses an available ephemeral port.
137-
Responder(uint16 port);
138-
139137
ZMQContext::Ptr context;
140138
void* socket;
141-
uint16 boundPort; // 0 indicates not bound
139+
bool valid;
140+
uint16 boundPort;
142141
int lastErrno;
143142

144143
static const int RECV_TIMEOUT_MS;
@@ -153,28 +152,24 @@ class NetworkEvents : public GenericProcessor
153152
//* Split network message into name/value pairs (name1=val1 name2=val2 etc) */
154153
StringPairArray parseNetworkMessage(StringRef msg);
155154

156-
// updates urlport and the port input on the editor (0 indicates not connected)
157-
void updatePort(uint16 port);
155+
// updates urlport and the port input on the editor (< 0 indicates not connected)
156+
void updatePortString(uint16 port);
158157

159158
// get an endpoint url for the given port (using 0 to represent *)
160159
static String getEndpoint(uint16 port);
161160

161+
// get a representation of the given port for use on the editor
162+
static String getPortString(uint16 port);
162163

163164
// share a "dumb" pointer that doesn't take part in reference counting.
164165
// want the context to be terminated by the time the static members are
165166
// destroyed (see: https://github.com/zeromq/libzmq/issues/1708)
166167
static ZMQContext* sharedContext;
167168
static CriticalSection sharedContextLock;
168169

169-
// To switch ports, a new socket is created and (if successful) assigned to this pointer,
170-
// and then the thread will switch to using this socket at the next opportunity.
171-
ScopedPointer<Responder> nextResponder;
172-
CriticalSection nextResponderLock;
173-
174-
bool restart;
175-
bool changeResponder;
176-
177-
uint16 urlport; // 0 indicates not connected
170+
std::atomic<bool> makeNewSocket; // port change or restart needed (depending on requestedPort)
171+
std::atomic<uint16> requestedPort; // never set by the thread; 0 means any free port
172+
std::atomic<uint16> boundPort; // only set by the thread; 0 means no connection
178173

179174
std::queue<StringTS> networkMessagesQueue;
180175
CriticalSection queueLock;

0 commit comments

Comments
 (0)