Skip to content

Commit c193bec

Browse files
committed
Simplify and fix for Linux and with no zeromq
1 parent 4d38f45 commit c193bec

2 files changed

Lines changed: 44 additions & 24 deletions

File tree

Source/Plugins/NetworkEvents/NetworkEvents.cpp

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,20 @@ NetworkEvents::NetworkEvents()
4646
, state (false)
4747
, urlport (0)
4848
, firstTime (true)
49-
, restart (0)
50-
, portString (getPortString())
49+
, restart (false)
50+
, changeResponder (false)
5151
{
5252
setProcessorType (PROCESSOR_TYPE_SOURCE);
5353

54+
portString = getPortString();
55+
portString.addListener(this);
56+
5457
if (!setNewListeningPort(5556))
5558
{
5659
// resort to choosing a port automatically
5760
setNewListeningPort(0);
5861
}
5962

60-
portString.addListener(this);
61-
6263
sendSampleCount = false; // disable updating the continuous buffer sample counts,
6364
// since this processor only sends events
6465

@@ -78,11 +79,6 @@ NetworkEvents::~NetworkEvents()
7879

7980
bool NetworkEvents::setNewListeningPort(uint16 port)
8081
{
81-
if (port == 0)
82-
{
83-
CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically");
84-
}
85-
8682
ScopedPointer<Responder> newResponder = Responder::makeResponder(port);
8783
if (newResponder == nullptr)
8884
{
@@ -91,25 +87,30 @@ bool NetworkEvents::setNewListeningPort(uint16 port)
9187

9288
ScopedLock nrLock(nextResponderLock);
9389
nextResponder = newResponder;
90+
changeResponder = true;
9491
return true;
9592
}
9693

9794

9895
String NetworkEvents::getPortString() const
9996
{
97+
#ifdef ZEROMQ
10098
uint16 port = urlport;
10199
if (port == 0)
102100
{
103101
return "<no cxn>";
104102
}
105103

106104
return String(port);
105+
#else
106+
return "<no zeromq>";
107+
#endif
107108
}
108109

109110

110111
void NetworkEvents::restartConnection()
111112
{
112-
restart = 1;
113+
restart = true;
113114
}
114115

115116

@@ -412,8 +413,9 @@ void NetworkEvents::run()
412413
while (!threadShouldExit())
413414
{
414415
// reopen connection if necessary
415-
if (restart.compareAndSetBool(0, 1))
416+
if (restart)
416417
{
418+
restart = false; // the other thread only sets restart to true, so it's not a race condition
417419
responder = nullptr; // destroy old one, which frees the port
418420
responder = Responder::makeResponder(urlport);
419421

@@ -424,16 +426,21 @@ void NetworkEvents::run()
424426
}
425427
}
426428

427-
// switch to responder with new port if necessary
428-
// unfortunately, atomic operations aren't available for smart pointers
429+
// switch to new responder if necessary
430+
if (changeResponder)
429431
{
430-
const ScopedTryLock nrLock(nextResponderLock);
431-
if (nrLock.isLocked() && nextResponder != nullptr)
432+
const ScopedLock nrLock(nextResponderLock);
433+
if (nextResponder != nullptr)
432434
{
435+
changeResponder = false; // this is also controlled by the nextResponderLock
433436
responder = nextResponder;
434437
connected = true;
435438
updatePort(responder->getBoundPort());
436439
}
440+
else
441+
{
442+
jassertfalse; // huh? a new responder should be available
443+
}
437444
}
438445

439446
// if we don't have a vaild (connected) socket, keep looping until we do
@@ -650,6 +657,7 @@ NetworkEvents::ZMQContext::ZMQContext(const ScopedLock& lock)
650657
: context(zmq_ctx_new())
651658
#endif
652659
{
660+
// sharedContextLock should already be held here
653661
sharedContext = this;
654662
}
655663

@@ -669,12 +677,16 @@ void* NetworkEvents::ZMQContext::createSocket()
669677
#ifdef ZEROMQ
670678
jassert(context != nullptr);
671679
return zmq_socket(context, ZMQ_REP);
680+
#else
681+
return nullptr;
672682
#endif
673683
}
674684

675685

676686
/*** Responder ***/
677687

688+
const int NetworkEvents::Responder::RECV_TIMEOUT_MS = 100;
689+
678690
NetworkEvents::Responder::Responder(uint16 port)
679691
: socket(nullptr)
680692
, boundPort(0)
@@ -719,6 +731,7 @@ NetworkEvents::Responder::Responder(uint16 port)
719731
// if requested port was 0, find out which port was actually used
720732
if (port == 0)
721733
{
734+
CoreServices::sendStatusMessage("NetworkEvents: Selecting port automatically");
722735
const size_t BUF_LEN = 32;
723736
size_t len = BUF_LEN;
724737
char endpoint[BUF_LEN];
@@ -737,7 +750,7 @@ NetworkEvents::Responder::Responder(uint16 port)
737750
}
738751

739752

740-
NetworkEvents::Responder* NetworkEvents::Responder::makeResponder(uint16 port)
753+
ScopedPointer<NetworkEvents::Responder> NetworkEvents::Responder::makeResponder(uint16 port)
741754
{
742755
ScopedPointer<Responder> socketPtr = new Responder(port);
743756
uint16 boundPort = socketPtr->getBoundPort();
@@ -752,7 +765,7 @@ NetworkEvents::Responder* NetworkEvents::Responder::makeResponder(uint16 port)
752765
jassertfalse; // huh?
753766
return nullptr;
754767
}
755-
return socketPtr.release();
768+
return socketPtr;
756769
}
757770

758771

Source/Plugins/NetworkEvents/NetworkEvents.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ class NetworkEvents : public GenericProcessor
115115

116116
String str;
117117
juce::int64 timestamp;
118+
119+
JUCE_LEAK_DETECTOR(StringTS);
118120
};
119121

120122
class ZMQContext : public ReferenceCountedObject
@@ -128,15 +130,16 @@ class NetworkEvents : public GenericProcessor
128130

129131
private:
130132
void* context;
133+
134+
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(ZMQContext);
131135
};
132136

133137
// RAII wrapper for REP socket
134138
class Responder
135139
{
136140
public:
137141
// tries to create a responder and bind to given port; returns nullptr on failure.
138-
// caller must own and destroy the returned responder if it succeeds.
139-
static Responder* makeResponder(uint16 port);
142+
static ScopedPointer<Responder> makeResponder(uint16 port);
140143
~Responder();
141144

142145
// returns the latest errno value
@@ -166,7 +169,9 @@ class NetworkEvents : public GenericProcessor
166169
uint16 boundPort; // 0 indicates not bound
167170
int lastErrno;
168171

169-
static const int RECV_TIMEOUT_MS = 100;
172+
static const int RECV_TIMEOUT_MS;
173+
174+
JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR(Responder);
170175
};
171176

172177
void postTimestamppedStringToMidiBuffer(StringTS s);
@@ -183,8 +188,7 @@ class NetworkEvents : public GenericProcessor
183188
// get an endpoint url for the given port (using 0 to represent *)
184189
static String getEndpoint(uint16 port);
185190

186-
Value portString; // underlying value of the editor's port input
187-
191+
188192
// share a "dumb" pointer that doesn't take part in reference counting.
189193
// want the context to be terminated by the time the static members are
190194
// destroyed (see: https://github.com/zeromq/libzmq/issues/1708)
@@ -196,9 +200,12 @@ class NetworkEvents : public GenericProcessor
196200
ScopedPointer<Responder> nextResponder;
197201
CriticalSection nextResponderLock;
198202

199-
uint16 urlport; // 0 indicates not connected
203+
bool restart;
204+
bool changeResponder;
200205

201-
Atomic<int> restart;
206+
uint16 urlport; // 0 indicates not connected
207+
// thread can modify this to update editor's port text without acquiring MessageManagerLock
208+
Value portString;
202209

203210
float threshold;
204211
float bufferZone;

0 commit comments

Comments
 (0)