Skip to content

Commit 7ba2eaa

Browse files
committed
zeroMQrr wrapper for Matlab: allow to specify an url in GetResponses command
1 parent 27cfccd commit 7ba2eaa

14 files changed

Lines changed: 235 additions & 52 deletions

Resources/Matlab/compile_matlab_zeroMQrr.m

100755100644
File mode changed.

Resources/Matlab/compile_matlab_zeroMQwrapper.m

100755100644
File mode changed.

Resources/Matlab/libzmq-v110-mt-3_2_2.dll

100755100644
File mode changed.

Resources/Matlab/zeroMQrr.m

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
%
21
% zeroMQrr provides a simple wrapper around the zeromq library with a
32
% request->response scenario in mind
43
%
@@ -12,9 +11,11 @@
1211
% When blocking is 0 (default) zeroMQrr adds the request to a queue and returns the time the request was added to that queue. See 'GetResponses'
1312
% [timeRequestAdded]=zeroMQrr('Send',url, request, 0);
1413
%
15-
% dialogue = zeroMQrr('GetResponses', [wairForEmptyQueue=1])
16-
% Retirieves responses collected for requests send in non blocking mode. If wairForEmptyQueue>0, the function waits for the last queued request to get a response.
17-
% dialogue is a struct with the fields: request, response, timeRequestAdded, timeRequestSent, timeResponeRecieved
14+
% dialogue = zeroMQrr('GetResponses' [, url] [,wairForEmptyQueue=1])
15+
% Retirieves responses collected for requests send in non blocking mode. If an url is provided
16+
% only responses for that url will be returend. If wairForEmptyQueue>0, the function waits for
17+
% the last queued request to get a response.
18+
% Dialogue is a struct with the fields: request, response, timeRequestAdded, timeRequestSent, timeResponeRecieved
1819
% At the moment this cannot get filtered by the url of the request yet.
1920
%
2021
% zeroMQrr('CloseAll')
@@ -36,4 +37,4 @@
3637
% it under the terms of the GNU General Public License as published by
3738
% the Free Software Foundation (see GPL.txt)
3839
%
39-
% Changes to allow receiving responses by Jonas Kn?ll, 2016
40+
% Changes to allow receiving responses by Jonas Knoell, 2016
4.59 KB
Binary file not shown.

Resources/Matlab/zeroMQrr.mexw64

1.5 KB
Binary file not shown.

Resources/Matlab/zeroMQrr/zeroMQrr.cpp

100755100644
Lines changed: 99 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ std::mutex socketMap_mutex;
8686
std::queue<dialogue> InDialogues;
8787
std::mutex InDialogues_mutex;
8888

89-
std::queue<dialogue> OutDialogues;
89+
typedef std::list<dialogue> dlist;
90+
dlist OutDialogues;
9091
std::mutex OutDialogues_mutex;
9192

9293
typedef std::list<ThreadData*> lst;
@@ -111,7 +112,7 @@ void CloseContext(void)
111112
(*i)->thread_running=false;
112113
}
113114
threadList_mutex.unlock();
114-
115+
115116
bool threadListEmpty=false;
116117
while(!threadListEmpty){
117118
threadList_mutex.lock();
@@ -124,7 +125,7 @@ void CloseContext(void)
124125
do_sleep(ms10);
125126
}
126127
}
127-
128+
128129
for (mp::iterator i= socketMap.begin(); i != socketMap.end(); i++) {
129130
int toms=0;
130131
zmq_setsockopt(i->second.zmqSocket,ZMQ_LINGER,&toms, sizeof(toms));
@@ -136,6 +137,25 @@ void CloseContext(void)
136137
}
137138
}
138139

140+
bool close_socket(std::string url){
141+
142+
bool success=false;
143+
socketMap_mutex.lock();
144+
mp::iterator it=socketMap.find(url);
145+
socketData sock;
146+
int toms=0;
147+
if (it != socketMap.end()){
148+
sock = it->second;
149+
zmq_setsockopt(sock.zmqSocket,ZMQ_LINGER,&toms, sizeof(toms));
150+
zmq_close (sock.zmqSocket);
151+
socketMap.erase(it);
152+
success = true;
153+
}
154+
socketMap_mutex.unlock();
155+
156+
return success;
157+
}
158+
139159
void initialize_zmq()
140160
{
141161
mexAtExit(CloseContext);
@@ -172,9 +192,9 @@ void SendAndReceive( dialogue* d )
172192
zmq_msg_t request_z;
173193
zmq_msg_init_size (&request_z, d->request.length());
174194
memcpy (zmq_msg_data (&request_z), d->request.c_str() , d->request.length());
175-
195+
176196
zmq_msg_send (&request_z, sock.zmqSocket, 0);
177-
197+
178198
auto time = std::chrono::high_resolution_clock::now().time_since_epoch();
179199
d->timeRequestSent = (std::chrono::duration_cast<std::chrono::microseconds>(time)).count();
180200
zmq_msg_close (&request_z);
@@ -235,7 +255,7 @@ void SendAndReceive( dialogue* d )
235255
SendAndReceive( &d );
236256

237257
OutDialogues_mutex.lock();
238-
OutDialogues.push(d);
258+
OutDialogues.push_back(d);
239259
OutDialogues_mutex.unlock();
240260
}else{
241261
InDialogues_mutex.unlock();
@@ -283,7 +303,7 @@ void mexFunction( int nlhs, mxArray* plhs[],
283303
if (nrhs < 1) {
284304
return;
285305
}
286-
306+
287307
char* Command = mxArrayToString(prhs[0]);
288308

289309
if (strcmp(Command, "StartConnectThread") == 0) {
@@ -313,7 +333,6 @@ void mexFunction( int nlhs, mxArray* plhs[],
313333

314334
if(blocking){
315335
SendAndReceive(&d);
316-
317336
if (d.timeResponseReceived==-1) {
318337
// Thread was killed and now we try to send things again?
319338
mexPrintf("ZMQ: Failed to get a response.\n");
@@ -341,6 +360,9 @@ void mexFunction( int nlhs, mxArray* plhs[],
341360
plhs[1]=mxOutStruct;
342361
}
343362
}else{ //not blocking
363+
if( lastDialogueAdded >= d.timeRequestAdded){ //make sure we get a new timestamp
364+
d.timeRequestAdded = lastDialogueAdded+1;
365+
}
344366
lastDialogueAdded = d.timeRequestAdded;
345367
InDialogues_mutex.lock();
346368
InDialogues.push(d);
@@ -359,7 +381,7 @@ void mexFunction( int nlhs, mxArray* plhs[],
359381
{
360382
mxArray * mxOutStruct;
361383
mxOutStruct = mxCreateDoubleMatrix(1,1, mxREAL);
362-
memcpy(mxOutStruct ,&(d.timeRequestAdded),sizeof(double));
384+
memcpy(mxGetData(mxOutStruct) ,&(d.timeRequestAdded),sizeof(double));
363385
plhs[0]=mxOutStruct;
364386
}
365387

@@ -372,26 +394,50 @@ void mexFunction( int nlhs, mxArray* plhs[],
372394
}
373395

374396
if (strcmp(Command, "CloseThread") == 0){
375-
threadList_mutex.lock();
376-
for (lst::iterator i= threadList.begin(); i != threadList.end(); i++) {
377-
(*i)->thread_running=false;
397+
398+
if(nrhs<2){
399+
return;
400+
}
401+
close_socket(std::string(mxArrayToString(prhs[1])));
402+
403+
//close thread only if socketMap empty
404+
socketMap_mutex.lock();
405+
if(!socketMap.empty()){
406+
socketMap_mutex.unlock();
407+
threadList_mutex.lock();
408+
for (lst::iterator i= threadList.begin(); i != threadList.end(); i++) {
409+
(*i)->thread_running=false;
410+
}
411+
threadList_mutex.unlock();
412+
}else{
413+
socketMap_mutex.unlock();
378414
}
379-
threadList_mutex.unlock();
380415
}
381416

382-
if (strcmp(Command, "GetResponses") == 0)
383-
{
384-
//do we want to waint untill the current Sending Queue is emptied and all replies are in?
417+
if (strcmp(Command, "GetResponses") == 0){
418+
385419
bool wairForEmptyQueue;
386-
if(nrhs<3)
387-
{
388-
wairForEmptyQueue = false;
389-
}else
390-
{
391-
double Tmp = mxGetScalar(prhs[2]);
420+
std::string url;
421+
//if no second input or second input is no string
422+
if ( nrhs<2 || !mxIsChar(prhs[1]) ){
423+
url = "*";
424+
}else{
425+
url = (mxArrayToString(prhs[1]));
426+
}
427+
428+
//do we want to waint untill the current Sending Queue is emptied and all replies are in?
429+
if(nrhs<2 || (nrhs<3 && mxIsChar(prhs[1])) ){//if only one input or 2 and that is an url
430+
wairForEmptyQueue = true;
431+
}else{
432+
double Tmp;
433+
if (mxIsChar(prhs[1])){ //if second is the url, take the third
434+
Tmp = mxGetScalar(prhs[2]);
435+
}else{
436+
Tmp = mxGetScalar(prhs[1]);
437+
}
392438
wairForEmptyQueue = Tmp>0;
393439
}
394-
440+
395441
double lastAddedTime = lastDialogueAdded;
396442
double lastFetchedTime = lastDialogueFetched;
397443
if(lastFetchedTime ==lastAddedTime){ //nothing to fetch
@@ -400,44 +446,57 @@ void mexFunction( int nlhs, mxArray* plhs[],
400446
//if so, do it now
401447
while(wairForEmptyQueue)
402448
{
403-
OutDialogues_mutex.lock(); //we might still be one respone behind...
404-
if(!OutDialogues.empty() && OutDialogues.back().timeRequestAdded>=lastAddedTime)
449+
OutDialogues_mutex.lock();
450+
if(!OutDialogues.empty() && OutDialogues.back().timeRequestAdded>=lastAddedTime)
405451
{
406-
wairForEmptyQueue=false;
452+
wairForEmptyQueue=false;
407453
}
408454
OutDialogues_mutex.unlock();
409455
do_sleep(ms10);
410456
}
411-
412-
//now we build a struct with all replies
457+
458+
//find the ones we want to return
459+
std::queue<dialogue> returnDialogues;
413460
OutDialogues_mutex.lock();
414461
if(!OutDialogues.empty()){
415462
lastDialogueFetched=OutDialogues.back().timeRequestAdded;
463+
464+
dlist::iterator it=OutDialogues.begin();
465+
while(it!=OutDialogues.end()){
466+
if(!url.compare("*") || !it->url.compare(url)){
467+
dialogue d= (*it);
468+
returnDialogues.push(d);
469+
it=OutDialogues.erase(it);
470+
}else{
471+
it++;
472+
}
473+
}
416474
}
417-
418-
mxArray * mxOutStruct = mxCreateStructMatrix(1,OutDialogues.size(),5,dialogueFieldnames);
419-
475+
OutDialogues_mutex.unlock();
476+
477+
//now we build a struct with all replies
478+
mxArray * mxOutStruct = mxCreateStructMatrix(1,returnDialogues.size(),5,dialogueFieldnames);
479+
420480
int j=0;
421-
while(!OutDialogues.empty())
481+
while(!returnDialogues.empty())
422482
{
423-
dialogue d = OutDialogues.front();
424-
OutDialogues.pop();
425-
483+
dialogue d = returnDialogues.front();
484+
returnDialogues.pop();
485+
426486
mxSetField(mxOutStruct, j, "request", mxCreateString( d.request.c_str() ) );
427487
mxSetField(mxOutStruct, j, "response", mxCreateString( d.response.c_str() ) );
428-
488+
429489
mxSetField(mxOutStruct,j, "timeRequestAdded", mxCreateDoubleMatrix(1,1, mxREAL));
430490
memcpy(mxGetData(mxGetField(mxOutStruct,j, "timeRequestAdded")) ,&(d.timeRequestAdded),sizeof(double));
431-
491+
432492
mxSetField(mxOutStruct,j, "timeRequestSent", mxCreateDoubleMatrix(1,1, mxREAL));
433493
memcpy(mxGetData(mxGetField(mxOutStruct,j, "timeRequestSent")) ,&(d.timeRequestSent),sizeof(double));
434-
494+
435495
mxSetField(mxOutStruct,j, "timeResponseReceived", mxCreateDoubleMatrix(1,1, mxREAL));
436496
memcpy(mxGetData(mxGetField(mxOutStruct,j, "timeResponseReceived")) ,&(d.timeResponseReceived),sizeof(double));
437497
j++;
438498
}
439-
OutDialogues_mutex.unlock();
440-
499+
441500
plhs[0] = mxOutStruct;
442501
}
443502
}

Resources/Matlab/zeroMQrr/zeroMQrr_example.m

100755100644
Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,56 @@
1-
url = 'localhost:5556'; % or, e.g., //'tcp://10.71.212.19:5556 if GUI runs on another machine...
1+
function zeroMQrr_example(url)
22

3-
% handle = zeroMQrr('StartConnectThread',url); not necessary anymore
3+
if nargin<1
4+
url = 'tcp://localhost:5556'; % or, e.g., //'tcp://10.71.212.19:5556 if GUI runs on another machine...
5+
end
6+
7+
%blocking request to get the current recording status
8+
[isacquiring, details]=zeroMQrr('Send',url, 'IsAcquiring', 1);
9+
%also get addiional info. this has a struct as returned by getResponses for
10+
%that one response
11+
isrecording=zeroMQrr('Send',url, 'IsRecording', 1);
12+
13+
createNewDir=true;
14+
RecDir=[];
15+
PrependText=[];
16+
AppendText=[];
17+
if (strcmp(isacquiring, '1'))
18+
message='StartAcquisition';
19+
% zeroMQrr('Send',url, 'StartAcquisition', 1);
20+
%options:
21+
if createNewDir
22+
message=[message ' CreateNewDir=1'];
23+
end
24+
if ~ismepty(RecDir)
25+
message=[message ' RecDir=' RecDir];
26+
end
27+
if ~ismepty(PrependText)
28+
message=[message ' PrependText=' PrependText];
29+
end
30+
if ~ismepty(AppendText)
31+
message=[message ' AppendText=' AppendText];
32+
end
33+
34+
% message=sprintf('StartAcquisition CreateNewDir=%i RecDir=%s PrependText=%s AppendText=%s');
35+
zeroMQrr('Send',url, message);
36+
end
37+
if (strcmp(isrecording,'1'))
38+
zeroMQrr('Send',url, 'StartRecord');
39+
end
40+
41+
%wait for responsess
42+
responses=zeroMQrr('GetResponses', url);
43+
44+
recordingNumber=zeroMQrr('Send',url, 'getRecordingNumber',1);
45+
experimentNumber=zeroMQrr('Send',url, 'getExperimentNumber',1);
446

547
zeroMQrr('Send',url ,'ClearDesign');
648
zeroMQrr('Send',url ,'NewDesign nGo_Left_Right');
749
zeroMQrr('Send',url ,'AddCondition Name GoRight TrialTypes 1 2 3');
850
zeroMQrr('Send',url ,'AddCondition Name GoLeft TrialTypes 4 5 6');
951

10-
%blocking request to get the current recording status
11-
isrecording=zeroMQrr('Send',url, 'IsRecording', 1);
12-
13-
tic; while toc < 2; end;
52+
%wait for responsess
53+
responses=zeroMQrr('GetResponses', url);
1454

1555
for k = 1:10
1656
% indicate trial type number #2 has started
@@ -22,8 +62,29 @@
2262
tic; while toc < 1; end;
2363
end
2464

65+
zeroMQrr('Send',url, 'StopRecord', 0);
66+
67+
zeroMQrr('Send',url, 'StopAcquisition', 0);
68+
2569
%get all responses to the 'Send' requests since the last call to 'GetResponses'
70+
%default: block untill all responses are in
2671
blockTillQueueEmpty=1;%default
72+
responses=zeroMQrr('GetResponses');
73+
responses=zeroMQrr('GetResponses',blockTillQueueEmpty);
74+
%caviat: will also block if only responses in queue are for another url
75+
responses=zeroMQrr('GetResponses',url);
76+
responses=zeroMQrr('GetResponses',url,blockTillQueueEmpty);
77+
78+
%%or choose only to get responses already obtained
79+
blockTillQueueEmpty=0;
2780
responses=zeroMQrr('GetResponses',blockTillQueueEmpty);
81+
%caviat: will also block if only responses in queue are for another url
82+
responses=zeroMQrr('GetResponses',url,blockTillQueueEmpty);
83+
84+
zeroMQrr('CloseThread', url);
85+
86+
zeroMQrr('CloseAll');
87+
88+
89+
end
2890

29-
zeroMQrr('CloseThread');

0 commit comments

Comments
 (0)