Skip to content

Commit 705f411

Browse files
committed
Problem:
When a ZMQ_STREAM socket reaches its SNDHWM, it becomes impossible to disconnect a peer. Disconnecting requires sending the routing ID followed by a 0-byte frame. Currently, xsend returns EAGAIN on the first frame (the ID) if the pipe is full, preventing the second disconnect frame from ever being processed. Solution: Modified xsend in stream.cpp to allow the routing ID frame to pass even when the pipe is full. If the subsequent payload frame is 0-bytes, the connection is terminated immediately via terminate(). To prevent state-machine desync, _more_out is reset to false if a data-bearing payload frame is sent on a full pipe, forcing an EAGAIN and requiring a clean retry from the user.
1 parent c5f0e8c commit 705f411

File tree

6 files changed

+8
-165
lines changed

6 files changed

+8
-165
lines changed

Makefile.am

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,6 @@ test_apps = \
460460
tests/test_probe_router \
461461
tests/test_stream \
462462
tests/test_stream_empty \
463-
tests/test_stream_disconnect_peer \
464463
tests/test_stream_disconnect \
465464
tests/test_stream_timeout \
466465
tests/test_disconnect_inproc \
@@ -636,10 +635,6 @@ tests_test_stream_timeout_SOURCES = tests/test_stream_timeout.cpp
636635
tests_test_stream_timeout_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
637636
tests_test_stream_timeout_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
638637

639-
tests_test_stream_disconnect_peer_SOURCES = tests/test_stream_disconnect_peer.cpp
640-
tests_test_stream_disconnect_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
641-
tests_test_stream_disconnect_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
642-
643638
tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp
644639
tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
645640
tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

doc/zmq_disconnect_peer.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ to send messages addressed with that 'routing_id' will fail with 'EHOSTUNREACH'
1616
until a new connection with a different 'routing_id' is established.
1717

1818
This function is supported on socket types that manage per-peer routing ids:
19-
'ZMQ_SERVER', 'ZMQ_PEER' and `ZMQ_STREAM`. Calling it on other socket types will fail with
19+
'ZMQ_SERVER' and 'ZMQ_PEER'. Calling it on other socket types will fail with
2020
'ENOTSUP'.
2121

2222

src/stream.cpp

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ int zmq::stream_t::xsend (msg_t *msg_)
7979
_current_out = out_pipe->pipe;
8080
if (!_current_out->check_write ()) {
8181
out_pipe->active = false;
82-
_current_out = NULL;
83-
errno = EAGAIN;
84-
return -1;
8582
}
8683
} else {
8784
errno = EHOSTUNREACH;
@@ -119,6 +116,13 @@ int zmq::stream_t::xsend (msg_t *msg_)
119116
_current_out = NULL;
120117
return 0;
121118
}
119+
if (!_current_out->check_write ()) {
120+
// Because we set _more_out to false above, the user is forced
121+
// to resend the Identity frame on their next attempt.
122+
_current_out = NULL;
123+
errno = EAGAIN;
124+
return -1;
125+
}
122126
const bool ok = _current_out->write (msg_);
123127
if (likely (ok))
124128
_current_out->flush ();
@@ -239,32 +243,6 @@ bool zmq::stream_t::xhas_out ()
239243
return true;
240244
}
241245

242-
int zmq::stream_t::xdisconnect_peer (uint32_t routing_id_)
243-
{
244-
unsigned char buffer[5];
245-
buffer[0] = 0;
246-
put_uint32 (buffer + 1, routing_id_);
247-
248-
blob_t routing_id;
249-
routing_id.set (buffer, sizeof buffer);
250-
251-
out_pipe_t *out_pipe = lookup_out_pipe (routing_id);
252-
if (!out_pipe) {
253-
errno = EHOSTUNREACH;
254-
return -1;
255-
}
256-
257-
out_pipe->pipe->terminate (false);
258-
259-
// if currently writing to this pipe at same time, reset _current_out and _more_out
260-
if (out_pipe->pipe == _current_out) {
261-
_current_out = NULL;
262-
_more_out = false;
263-
}
264-
265-
return 0;
266-
}
267-
268246
void zmq::stream_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
269247
{
270248
// Always assign routing id for raw-socket

src/stream.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ class stream_t ZMQ_FINAL : public routing_socket_base_t
2929
void xread_activated (zmq::pipe_t *pipe_);
3030
void xpipe_terminated (zmq::pipe_t *pipe_);
3131
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
32-
int xdisconnect_peer (uint32_t routing_id_) ZMQ_OVERRIDE;
3332

3433
private:
3534
// Generate peer's id and update lookup map

tests/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ set(tests
2626
test_probe_router
2727
test_stream
2828
test_stream_empty
29-
test_stream_disconnect_peer
3029
test_stream_disconnect
3130
test_disconnect_inproc
3231
test_unbind_wildcard

tests/test_stream_disconnect_peer.cpp

Lines changed: 0 additions & 128 deletions
This file was deleted.

0 commit comments

Comments
 (0)