@@ -34,6 +34,7 @@ void* EventBroadcaster::ZMQContext::createZMQSocket()
3434 jassert (context != nullptr );
3535 return zmq_socket (context, ZMQ_PUB);
3636#else
37+ jassertfalse; // should never be called in this case
3738 return nullptr ;
3839#endif
3940}
@@ -118,6 +119,7 @@ String EventBroadcaster::getEndpoint(int port)
118119
119120EventBroadcaster::EventBroadcaster ()
120121 : GenericProcessor (" Event Broadcaster" )
122+ , outputFormat (RAW_BINARY)
121123{
122124 setProcessorType (PROCESSOR_TYPE_SINK);
123125
@@ -165,7 +167,7 @@ int EventBroadcaster::setListeningPort(int port, bool forceRestart)
165167 if (!newSocket->isValid ())
166168 {
167169 status = zmq_errno ();
168- std::cout << " Failed to create socket: " << zmq_strerror (status) << std::endl;
170+ std::cout << " Failed to open socket: " << zmq_strerror (status) << std::endl;
169171 }
170172 else
171173 {
@@ -201,46 +203,278 @@ int EventBroadcaster::setListeningPort(int port, bool forceRestart)
201203}
202204
203205
206+ int EventBroadcaster::getOutputFormat () const
207+ {
208+ return outputFormat;
209+ }
210+
211+
212+ void EventBroadcaster::setOutputFormat (int format)
213+ {
214+ outputFormat = format;
215+ }
216+
217+
204218void EventBroadcaster::process (AudioSampleBuffer& continuousBuffer)
205219{
206220 checkForEvents (true );
207221}
208222
209-
210- // IMPORTANT: The structure of the event buffers has changed drastically, so we need to find a better way of doing this
211- void EventBroadcaster::sendEvent (const MidiMessage& event, float eventSampleRate) const
223+ void EventBroadcaster::sendEvent (const InfoObjectCommon* channel, const MidiMessage& msg) const
212224{
213225#ifdef ZEROMQ
214- double timestampSeconds = double (Event::getTimestamp (event)) / eventSampleRate;
215- uint16 type = Event::getBaseType (event);
226+ // TODO Create a procotol that has outline for every type of event
227+ int currFormat = outputFormat;
228+ Array<MsgPart> message;
229+
230+ // common info that isn't type-specific
231+ EventType baseType = Event::getBaseType (msg);
232+ const String& identifier = channel->getIdentifier ();
233+ float sampleRate = channel->getSampleRate ();
234+ int64 timestamp = Event::getTimestamp (msg);
235+
236+ if (currFormat == RAW_BINARY)
237+ {
238+ uint16 baseType16 = static_cast <uint16>(baseType); // for backward compatability
239+ double timestampSeconds = double (timestamp) / sampleRate;
240+ const void * rawData = msg.getRawData ();
241+ size_t rawDataSize = msg.getRawDataSize ();
242+
243+ message.add ({ " base type" , { &baseType16, sizeof (baseType16) } });
244+ message.add ({ " timestamp" , { ×tampSeconds, sizeof (timestampSeconds) } });
245+ message.add ({ " raw data" , { rawData, rawDataSize } });
246+ }
247+ else // deserialize the data, get metadata, etc.
248+ {
249+ // info to be assigned depending on the event type
250+ String header;
251+ DynamicObject::Ptr jsonObj = new DynamicObject ();
252+ EventBasePtr baseEvent;
253+ const MetaDataEventObject* metaDataChannel;
254+
255+ // deserialize event and get type-specific information
256+ switch (baseType)
257+ {
258+ case SPIKE_EVENT:
259+ {
260+ auto spikeChannel = static_cast <const SpikeChannel*>(channel);
261+ metaDataChannel = static_cast <const MetaDataEventObject*>(spikeChannel);
262+
263+ baseEvent = SpikeEvent::deserializeFromMessage (msg, spikeChannel).release ();
264+ auto spike = static_cast <SpikeEvent*>(baseEvent.get ());
265+
266+ // create header
267+ uint16 sortedID = spike->getSortedID ();
268+ header = " spike/sortedid:" + String (sortedID) + " /id:" + identifier + " /ts:" + String (timestamp);
269+
270+ if (currFormat == HEADER_AND_JSON)
271+ {
272+ // add info to JSON
273+ jsonObj->setProperty (" type" , " spike" );
274+ jsonObj->setProperty (" sortedID" , sortedID);
275+
276+ int spikeChannels = spikeChannel->getNumChannels ();
277+ jsonObj->setProperty (" numChannels" , spikeChannels);
278+
279+ Array<var> thresholds;
280+ for (int i = 0 ; i < spikeChannels; ++i)
281+ {
282+ thresholds.add (spike->getThreshold (i));
283+ }
284+ jsonObj->setProperty (" threshold" , thresholds);
285+ }
286+
287+ break ; // case SPIKE_EVENT
288+ }
289+
290+ case PROCESSOR_EVENT:
291+ {
292+ auto eventChannel = static_cast <const EventChannel*>(channel);
293+ metaDataChannel = static_cast <const MetaDataEventObject*>(eventChannel);
294+
295+ baseEvent = Event::deserializeFromMessage (msg, eventChannel).release ();
296+ auto event = static_cast <Event*>(baseEvent.get ());
216297
298+ uint16 channel = event->getChannel ();
299+
300+ // for json
301+ var type;
302+ var data;
303+
304+ auto eventType = event->getEventType ();
305+ switch (eventType)
306+ {
307+ case EventChannel::EventChannelTypes::TTL:
308+ {
309+ bool state = static_cast <TTLEvent*>(event)->getState ();
310+
311+ header = " ttl/channel:" + String (channel) + " /state:" + (state ? " 1" : " 0" ) +
312+ " /id:" + identifier + " /ts:" + String (timestamp);
313+
314+ type = " ttl" ;
315+ data = state;
316+ break ;
317+ }
318+
319+ case EventChannel::EventChannelTypes::TEXT:
320+ {
321+ const String& text = static_cast <TextEvent*>(event)->getText ();
322+
323+ header = " text/channel:" + String (channel) + " /id:" + identifier +
324+ " /text:" + text + " /ts:" + String (timestamp);
325+
326+ type = " text" ;
327+ data = text;
328+ break ;
329+ }
330+
331+ default :
332+ {
333+ if (eventType < EventChannel::EventChannelTypes::BINARY_BASE_VALUE ||
334+ eventType >= EventChannel::EventChannelTypes::INVALID)
335+ {
336+ jassertfalse;
337+ return ;
338+ }
339+
340+ // must have binary event
341+
342+ BaseType dataType = eventChannel->getEquivalentMetaDataType ();
343+ auto dataReader = getDataReader (dataType);
344+ const void * rawData = static_cast <BinaryEvent*>(event)->getBinaryDataPointer ();
345+ unsigned int length = eventChannel->getLength ();
346+
347+ type = " binary" ;
348+ data = dataReader (rawData, length);
349+
350+ String dataString;
351+ if (data.isArray ()) // make comma-separated list of values
352+ {
353+ int length = data.size ();
354+ for (int i = 0 ; i < length; ++i)
355+ {
356+ if (i > 0 ) { dataString += " ," ; }
357+ dataString += data[i].toString ();
358+ }
359+ }
360+ else
361+ {
362+ dataString = data.toString ();
363+ }
364+
365+ header = " binary/channel:" + String (channel) + " /id:" + identifier +
366+ " /data:" + dataString + " /ts:" + String (timestamp);
367+
368+ break ;
369+ }
370+ } // end switch(eventType)
371+
372+ if (currFormat == HEADER_AND_JSON)
373+ {
374+ jsonObj->setProperty (" channel" , channel);
375+ jsonObj->setProperty (" type" , type);
376+ jsonObj->setProperty (" data" , data);
377+ }
378+
379+ break ; // case PROCESSOR_EVENT
380+ }
381+
382+ default :
383+ jassertfalse; // should never happen
384+ return ;
385+
386+ } // end switch(baseType)
387+
388+ message.add ({ " header" , { header.toRawUTF8 (), header.getNumBytesAsUTF8 () } });
389+
390+ String jsonString; // must be outside the if-statement so it remains in scope when we send the message
391+ if (currFormat == HEADER_AND_JSON)
392+ {
393+ // Add common info to JSON
394+ // Still sending these guys as float/doubles for now. Might change in future.
395+ DynamicObject::Ptr timing = new DynamicObject ();
396+ timing->setProperty (" sampleRate" , sampleRate);
397+ timing->setProperty (" timestamp" , timestamp);
398+ jsonObj->setProperty (" timing" , timing.get ());
399+
400+ jsonObj->setProperty (" identifier" , identifier);
401+ jsonObj->setProperty (" name" , channel->getName ());
402+
403+ // Add metadata
404+ DynamicObject::Ptr metaDataObj = new DynamicObject ();
405+ populateMetaData (metaDataChannel, baseEvent, metaDataObj);
406+ jsonObj->setProperty (" metaData" , metaDataObj.get ());
407+
408+ String jsonString = JSON::toString (var (jsonObj));
409+ message.add ({ " json" , { jsonString.toRawUTF8 (), jsonString.getNumBytesAsUTF8 () } });
410+ }
411+ }
412+
413+ sendMessage (message);
414+ #endif
415+ }
416+
417+ int EventBroadcaster::sendMessage (const Array<MsgPart>& parts) const
418+ {
419+ #ifdef ZEROMQ
217420 if (zmqSocket == nullptr )
218421 {
219422 std::cout << " Failed to send message: no socket" << std::endl;
423+ return -1 ;
424+ }
425+
426+ int numParts = parts.size ();
427+ for (int i = 0 ; i < numParts; ++i)
428+ {
429+ const MsgPart& part = parts.getUnchecked (i);
430+ int flags = (i < numParts - 1 ) ? ZMQ_SNDMORE : 0 ;
431+ if (-1 == zmqSocket->send (part.data .getData (), part.data .getSize (), flags))
432+ {
433+ std::cout << " Error sending " << part.name << " : " << zmq_strerror (zmq_errno ()) << std::endl;
434+ return -1 ;
435+ }
220436 }
221- else if (-1 == zmqSocket->send (&type, sizeof (type), ZMQ_SNDMORE) ||
222- -1 == zmqSocket->send (×tampSeconds, sizeof (timestampSeconds), ZMQ_SNDMORE) ||
223- -1 == zmqSocket->send (event.getRawData (), event.getRawDataSize (), 0 ))
224- {
225- std::cout << " Failed to send message: " << zmq_strerror (zmq_errno ()) << std::endl;
226- }
227437#endif
438+ return 0 ;
228439}
229440
441+ void EventBroadcaster::populateMetaData (const MetaDataEventObject* channel,
442+ const EventBasePtr event, DynamicObject::Ptr dest)
443+ {
444+ // Iterate through all event data and add to metadata object
445+ int numMetaData = event->getMetadataValueCount ();
446+ for (int i = 0 ; i < numMetaData; i++)
447+ {
448+ // Get metadata name
449+ const MetaDataDescriptor* metaDescPtr = channel->getEventMetaDataDescriptor (i);
450+ const String& metaDataName = metaDescPtr->getName ();
451+
452+ // Get metadata value
453+ const MetaDataValue* valuePtr = event->getMetaDataValue (i);
454+ const void * rawPtr = valuePtr->getRawValuePointer ();
455+ unsigned int length = valuePtr->getDataLength ();
456+
457+ auto dataReader = getDataReader (valuePtr->getDataType ());
458+ dest->setProperty (metaDataName, dataReader (rawPtr, length));
459+ }
460+ }
461+
462+
230463void EventBroadcaster::handleEvent (const EventChannel* channelInfo, const MidiMessage& event, int samplePosition)
231464{
232- sendEvent (event, channelInfo-> getSampleRate () );
465+ sendEvent (channelInfo, event );
233466}
234467
235468void EventBroadcaster::handleSpike (const SpikeChannel* channelInfo, const MidiMessage& event, int samplePosition)
236469{
237- sendEvent (event, channelInfo-> getSampleRate () );
470+ sendEvent (channelInfo, event );
238471}
239472
240473void EventBroadcaster::saveCustomParametersToXml (XmlElement* parentElement)
241474{
242475 XmlElement* mainNode = parentElement->createNewChildElement (" EVENTBROADCASTER" );
243476 mainNode->setAttribute (" port" , getListeningPort ());
477+ mainNode->setAttribute (" format" , outputFormat);
244478}
245479
246480
@@ -253,7 +487,80 @@ void EventBroadcaster::loadCustomParametersFromXml()
253487 if (mainNode->hasTagName (" EVENTBROADCASTER" ))
254488 {
255489 setListeningPort (mainNode->getIntAttribute (" port" , getListeningPort ()));
490+
491+ outputFormat = mainNode->getIntAttribute (" format" , outputFormat);
492+ auto ed = static_cast <EventBroadcasterEditor*>(getEditor ());
493+ if (ed)
494+ {
495+ ed->setDisplayedFormat (outputFormat);
496+ }
256497 }
257498 }
258499 }
259500}
501+
502+ template <typename T>
503+ var EventBroadcaster::binaryValueToVar (const void * value, unsigned int dataLength)
504+ {
505+ auto typedValue = reinterpret_cast <const T*>(value);
506+
507+ if (dataLength == 1 )
508+ {
509+ return String (*typedValue);
510+ }
511+ else
512+ {
513+ Array<var> metaDataArray;
514+ for (unsigned int i = 0 ; i < dataLength; ++i)
515+ {
516+ metaDataArray.add (String (typedValue[i]));
517+ }
518+ return metaDataArray;
519+ }
520+ }
521+
522+ var EventBroadcaster::stringValueToVar (const void * value, unsigned int dataLength)
523+ {
524+ return String::createStringFromData (value, dataLength);
525+ }
526+
527+ EventBroadcaster::DataToVarFcn EventBroadcaster::getDataReader (BaseType dataType)
528+ {
529+ switch (dataType)
530+ {
531+ case BaseType::CHAR:
532+ return &stringValueToVar;
533+
534+ case BaseType::INT8:
535+ return &binaryValueToVar<int8>;
536+
537+ case BaseType::UINT8:
538+ return &binaryValueToVar<uint8>;
539+
540+ case BaseType::INT16:
541+ return &binaryValueToVar<int16>;
542+
543+ case BaseType::UINT16:
544+ return &binaryValueToVar<uint16>;
545+
546+ case BaseType::INT32:
547+ return &binaryValueToVar<int32>;
548+
549+ case BaseType::UINT32:
550+ return &binaryValueToVar<uint32>;
551+
552+ case BaseType::INT64:
553+ return &binaryValueToVar<int64>;
554+
555+ case BaseType::UINT64:
556+ return &binaryValueToVar<uint64>;
557+
558+ case BaseType::FLOAT:
559+ return &binaryValueToVar<float >;
560+
561+ case BaseType::DOUBLE:
562+ return &binaryValueToVar<double >;
563+ }
564+ jassertfalse;
565+ return nullptr ;
566+ }
0 commit comments