Skip to content

Commit aedf4ec

Browse files
authored
Merge pull request #281 from tne-lab/eb-scopedpointer
Event Broadcaster socket reorganization
2 parents 7aeec06 + 97abf91 commit aedf4ec

2 files changed

Lines changed: 108 additions & 74 deletions

File tree

Source/Plugins/EventBroadcaster/EventBroadcaster.cpp

Lines changed: 89 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
EventBroadcaster::ZMQContext::ZMQContext()
1515
#ifdef ZEROMQ
1616
: context(zmq_ctx_new())
17+
#else
18+
: context(nullptr)
1719
#endif
1820
{}
1921

@@ -31,72 +33,91 @@ void* EventBroadcaster::ZMQContext::createZMQSocket()
3133
#ifdef ZEROMQ
3234
jassert(context != nullptr);
3335
return zmq_socket(context, ZMQ_PUB);
36+
#else
37+
return nullptr;
3438
#endif
3539
}
3640

37-
EventBroadcaster::ZMQSocketPtr::ZMQSocketPtr()
38-
: std::unique_ptr<void, decltype(&closeZMQSocket)>(nullptr, &closeZMQSocket)
41+
EventBroadcaster::ZMQSocket::ZMQSocket()
42+
: socket (nullptr)
43+
, boundPort (0)
3944
{
4045
#ifdef ZEROMQ
41-
reset(context->createZMQSocket());
46+
socket = context->createZMQSocket();
4247
#endif
4348
}
4449

45-
EventBroadcaster::ZMQSocketPtr::~ZMQSocketPtr()
50+
EventBroadcaster::ZMQSocket::~ZMQSocket()
4651
{
47-
// close the socket before the context might get destroyed.
48-
reset(nullptr);
52+
#ifdef ZEROMQ
53+
unbind(); // do this explicitly to free the port immediately
54+
zmq_close(socket);
55+
#endif
4956
}
5057

51-
int EventBroadcaster::unbindZMQSocket()
58+
bool EventBroadcaster::ZMQSocket::isValid() const
59+
{
60+
return socket != nullptr;
61+
}
62+
63+
int EventBroadcaster::ZMQSocket::getBoundPort() const
64+
{
65+
return boundPort;
66+
}
67+
68+
int EventBroadcaster::ZMQSocket::send(const void* buf, size_t len, int flags)
5269
{
5370
#ifdef ZEROMQ
54-
void* socket = zmqSocket.get();
55-
if (socket != nullptr && listeningPort != 0)
56-
{
57-
return zmq_unbind(socket, getEndpoint(listeningPort).toRawUTF8());
58-
}
71+
return zmq_send(socket, buf, len, flags);
5972
#endif
6073
return 0;
6174
}
6275

63-
int EventBroadcaster::rebindZMQSocket()
76+
int EventBroadcaster::ZMQSocket::bind(int port)
6477
{
6578
#ifdef ZEROMQ
66-
void* socket = zmqSocket.get();
67-
if (socket != nullptr && listeningPort != 0)
79+
if (isValid() && port != 0)
6880
{
69-
return zmq_bind(socket, getEndpoint(listeningPort).toRawUTF8());
81+
int status = unbind();
82+
if (status == 0)
83+
{
84+
status = zmq_bind(socket, getEndpoint(port).toRawUTF8());
85+
if (status == 0)
86+
{
87+
boundPort = port;
88+
}
89+
}
90+
return status;
7091
}
7192
#endif
7293
return 0;
7394
}
7495

75-
void EventBroadcaster::closeZMQSocket(void* socket)
96+
int EventBroadcaster::ZMQSocket::unbind()
7697
{
7798
#ifdef ZEROMQ
78-
zmq_close(socket);
99+
if (isValid() && boundPort != 0)
100+
{
101+
int status = zmq_unbind(socket, getEndpoint(boundPort).toRawUTF8());
102+
if (status == 0)
103+
{
104+
boundPort = 0;
105+
}
106+
return status;
107+
}
79108
#endif
109+
return 0;
80110
}
81111

112+
82113
String EventBroadcaster::getEndpoint(int port)
83114
{
84115
return String("tcp://*:") + String(port);
85116
}
86117

87-
void EventBroadcaster::reportActualListeningPort(int port)
88-
{
89-
listeningPort = port;
90-
auto editor = static_cast<EventBroadcasterEditor*>(getEditor());
91-
if (editor)
92-
{
93-
editor->setDisplayedPort(port);
94-
}
95-
}
96118

97119
EventBroadcaster::EventBroadcaster()
98120
: GenericProcessor ("Event Broadcaster")
99-
, listeningPort (0)
100121
{
101122
setProcessorType (PROCESSOR_TYPE_SINK);
102123

@@ -118,59 +139,65 @@ AudioProcessorEditor* EventBroadcaster::createEditor()
118139

119140
int EventBroadcaster::getListeningPort() const
120141
{
121-
return listeningPort;
142+
if (zmqSocket == nullptr)
143+
{
144+
return 0;
145+
}
146+
return zmqSocket->getBoundPort();
122147
}
123148

124149

125150
int EventBroadcaster::setListeningPort(int port, bool forceRestart)
126151
{
127-
if ((listeningPort != port) || forceRestart)
152+
int status = 0;
153+
int currPort = getListeningPort();
154+
if ((currPort != port) || forceRestart)
128155
{
129156
#ifdef ZEROMQ
130157
// unbind current socket (if any) to free up port
131-
unbindZMQSocket();
132-
ZMQSocketPtr newSocket;
133-
auto editor = static_cast<EventBroadcasterEditor*>(getEditor());
134-
int status = 0;
158+
if (zmqSocket != nullptr)
159+
{
160+
zmqSocket->unbind();
161+
}
135162

136-
if (!newSocket.get())
163+
ScopedPointer<ZMQSocket> newSocket = new ZMQSocket();
164+
165+
if (!newSocket->isValid())
137166
{
138167
status = zmq_errno();
139168
std::cout << "Failed to create socket: " << zmq_strerror(status) << std::endl;
140169
}
141170
else
142171
{
143-
if (0 != zmq_bind(newSocket.get(), getEndpoint(port).toRawUTF8()))
172+
if (0 != newSocket->bind(port))
144173
{
145174
status = zmq_errno();
146-
std::cout << "Failed to open socket: " << zmq_strerror(status) << std::endl;
175+
std::cout << "Failed to bind to port " << port << ": "
176+
<< zmq_strerror(status) << std::endl;
147177
}
148178
else
149179
{
150180
// success
151-
zmqSocket.swap(newSocket);
152-
reportActualListeningPort(port);
153-
return status;
181+
zmqSocket = newSocket;
154182
}
155183
}
156184

157-
// failure, try to rebind current socket to previous port
158-
if (0 == rebindZMQSocket())
185+
if (status != 0 && zmqSocket != nullptr)
159186
{
160-
reportActualListeningPort(listeningPort);
187+
// try to rebind current socket to previous port
188+
zmqSocket->bind(currPort);
161189
}
162-
else
163-
{
164-
reportActualListeningPort(0);
165-
}
166-
return status;
167190

168-
#else
169-
reportActualListeningPort(port);
170-
return 0;
171191
#endif
172192
}
173-
return -1;
193+
194+
// update editor
195+
auto editor = static_cast<EventBroadcasterEditor*>(getEditor());
196+
if (editor != nullptr)
197+
{
198+
editor->setDisplayedPort(getListeningPort());
199+
}
200+
return status;
174201
}
175202

176203

@@ -183,13 +210,17 @@ void EventBroadcaster::process(AudioSampleBuffer& continuousBuffer)
183210
//IMPORTANT: The structure of the event buffers has changed drastically, so we need to find a better way of doing this
184211
void EventBroadcaster::sendEvent(const MidiMessage& event, float eventSampleRate) const
185212
{
213+
#ifdef ZEROMQ
186214
double timestampSeconds = double(Event::getTimestamp(event)) / eventSampleRate;
187215
uint16 type = Event::getBaseType(event);
188216

189-
#ifdef ZEROMQ
190-
if (-1 == zmq_send(zmqSocket.get(), &type, sizeof(type), ZMQ_SNDMORE) ||
191-
-1 == zmq_send(zmqSocket.get(), &timestampSeconds, sizeof(timestampSeconds), ZMQ_SNDMORE) ||
192-
-1 == zmq_send(zmqSocket.get(), event.getRawData(), event.getRawDataSize(), 0))
217+
if (zmqSocket == nullptr)
218+
{
219+
std::cout << "Failed to send message: no socket" << std::endl;
220+
}
221+
else if (-1 == zmqSocket->send(&type, sizeof(type), ZMQ_SNDMORE) ||
222+
-1 == zmqSocket->send(&timestampSeconds, sizeof(timestampSeconds), ZMQ_SNDMORE) ||
223+
-1 == zmqSocket->send(event.getRawData(), event.getRawDataSize(), 0))
193224
{
194225
std::cout << "Failed to send message: " << zmq_strerror(zmq_errno()) << std::endl;
195226
}
@@ -209,7 +240,7 @@ void EventBroadcaster::handleSpike(const SpikeChannel* channelInfo, const MidiMe
209240
void EventBroadcaster::saveCustomParametersToXml(XmlElement* parentElement)
210241
{
211242
XmlElement* mainNode = parentElement->createNewChildElement("EVENTBROADCASTER");
212-
mainNode->setAttribute("port", listeningPort);
243+
mainNode->setAttribute("port", getListeningPort());
213244
}
214245

215246

@@ -221,7 +252,7 @@ void EventBroadcaster::loadCustomParametersFromXml()
221252
{
222253
if (mainNode->hasTagName("EVENTBROADCASTER"))
223254
{
224-
setListeningPort(mainNode->getIntAttribute("port"));
255+
setListeningPort(mainNode->getIntAttribute("port", getListeningPort()));
225256
}
226257
}
227258
}

Source/Plugins/EventBroadcaster/EventBroadcaster.h

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
#endif
2323
#endif
2424

25-
#include <memory>
26-
2725
class EventBroadcaster : public GenericProcessor
2826
{
2927
public:
@@ -33,7 +31,7 @@ class EventBroadcaster : public GenericProcessor
3331

3432
int getListeningPort() const;
3533
// returns 0 on success, else the errno value for the error that occurred.
36-
int setListeningPort (int port, bool forceRestart = false);
34+
int setListeningPort(int port, bool forceRestart = false);
3735

3836
void process (AudioSampleBuffer& continuousBuffer) override;
3937
void handleEvent (const EventChannel* channelInfo, const MidiMessage& event, int samplePosition = 0) override;
@@ -54,27 +52,32 @@ class EventBroadcaster : public GenericProcessor
5452
void* context;
5553
};
5654

57-
static void closeZMQSocket(void* socket);
58-
59-
class ZMQSocketPtr : public std::unique_ptr<void, decltype(&closeZMQSocket)>
55+
class ZMQSocket
6056
{
6157
public:
62-
ZMQSocketPtr();
63-
~ZMQSocketPtr();
58+
ZMQSocket();
59+
~ZMQSocket();
60+
61+
bool isValid() const;
62+
int getBoundPort() const;
63+
64+
int send(const void* buf, size_t len, int flags);
65+
int bind(int port);
66+
int unbind();
6467
private:
68+
int boundPort;
69+
void* socket;
70+
71+
// see here for why the context can't just be static:
72+
// https://github.com/zeromq/libzmq/issues/1708
6573
SharedResourcePointer<ZMQContext> context;
6674
};
67-
68-
int unbindZMQSocket();
69-
int rebindZMQSocket();
7075

7176
void sendEvent(const MidiMessage& event, float eventSampleRate) const;
72-
static String getEndpoint(int port);
73-
// called from getListeningPort() depending on success/failure of ZMQ operations
74-
void reportActualListeningPort(int port);
7577

76-
ZMQSocketPtr zmqSocket;
77-
int listeningPort;
78+
static String getEndpoint(int port);
79+
80+
ScopedPointer<ZMQSocket> zmqSocket;
7881
};
7982

8083

0 commit comments

Comments
 (0)