Skip to content

Commit 0c9b1ca

Browse files
committed
Problem: ZMQ_STREAM cannot disconnect when SNDHWM is reached
In ZMQ_STREAM, sending the routing ID frame followed by an empty payload signals a disconnect. If the SNDHWM is reached, the routing ID frame is blocked by check_write(), returning EAGAIN and preventing the disconnect signal from being processed. Solution: defer HWM check to the payload frame in stream_t::xsend. Also added a regression test in tests/test_stream_hwm_disconnect.cpp. Fixes #4828
1 parent 90fbd61 commit 0c9b1ca

File tree

3 files changed

+93
-0
lines changed

3 files changed

+93
-0
lines changed

Makefile.am

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ test_apps = \
461461
tests/test_stream \
462462
tests/test_stream_empty \
463463
tests/test_stream_disconnect \
464+
tests/test_stream_hwm_disconnect \
464465
tests/test_stream_timeout \
465466
tests/test_disconnect_inproc \
466467
tests/test_unbind_wildcard \
@@ -639,6 +640,10 @@ tests_test_stream_disconnect_SOURCES = tests/test_stream_disconnect.cpp
639640
tests_test_stream_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
640641
tests_test_stream_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
641642

643+
tests_test_stream_hwm_disconnect_SOURCES = tests/test_stream_hwm_disconnect.cpp
644+
tests_test_stream_hwm_disconnect_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
645+
tests_test_stream_hwm_disconnect_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
646+
642647
tests_test_disconnect_inproc_SOURCES = tests/test_disconnect_inproc.cpp
643648
tests_test_disconnect_inproc_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
644649
tests_test_disconnect_inproc_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ set(tests
2727
test_stream
2828
test_stream_empty
2929
test_stream_disconnect
30+
test_stream_hwm_disconnect
3031
test_disconnect_inproc
3132
test_unbind_wildcard
3233
test_ctx_options
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/* SPDX-License-Identifier: MPL-2.0 */
2+
3+
#include "testutil.hpp"
4+
#include "testutil_unity.hpp"
5+
6+
#include <string.h>
7+
8+
SETUP_TEARDOWN_TESTCONTEXT
9+
10+
void test_stream_hwm_disconnect ()
11+
{
12+
void *stream = test_context_socket (ZMQ_STREAM);
13+
char endpoint[MAX_SOCKET_STRING];
14+
15+
// Set a low Send High Water Mark to trigger the issue quickly
16+
int sndhwm = 3;
17+
TEST_ASSERT_SUCCESS_ERRNO (
18+
zmq_setsockopt (stream, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm)));
19+
20+
// Bind the STREAM socket to a loopback address
21+
bind_loopback_ipv4 (stream, endpoint, sizeof (endpoint));
22+
23+
// Connect a raw TCP socket to the ZMQ_STREAM socket
24+
fd_t fd = connect_socket (endpoint);
25+
26+
// STREAM socket receives two frames on connection:
27+
// 1. The routing ID of the new peer
28+
zmq_msg_t routing_id;
29+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id));
30+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
31+
32+
// Store routing ID for later use in disconnection
33+
size_t id_size = zmq_msg_size (&routing_id);
34+
void *id_data = zmq_msg_data (&routing_id);
35+
36+
// 2. An empty frame (connection notification)
37+
TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
38+
zmq_msg_t empty;
39+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&empty));
40+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&empty, stream, 0));
41+
TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&empty));
42+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&empty));
43+
44+
// Fill the outgoing pipe until it hits the High Water Mark.
45+
// In ZMQ_STREAM, we send [Routing ID][Data].
46+
while (true) {
47+
// Send Routing ID frame
48+
int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE);
49+
if (rc == -1)
50+
break;
51+
52+
// Send a large data frame to fill the buffer
53+
zmq_msg_t data;
54+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&data, 262144));
55+
rc = zmq_msg_send (&data, stream, ZMQ_DONTWAIT);
56+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data));
57+
if (rc == -1)
58+
break;
59+
}
60+
61+
// Verify that we actually reached the HWM
62+
TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
63+
64+
// TEST: Attempt to disconnect the client by sending the Routing ID
65+
// followed by a 0-byte payload.
66+
// Before the fix, the first frame (Routing ID) would fail with EAGAIN.
67+
int rc = zmq_send (stream, id_data, id_size, ZMQ_DONTWAIT | ZMQ_SNDMORE);
68+
TEST_ASSERT_EQUAL_INT ((int) id_size, rc);
69+
70+
// The second frame (0-byte) should trigger the termination logic
71+
rc = zmq_send (stream, NULL, 0, ZMQ_DONTWAIT);
72+
TEST_ASSERT_EQUAL_INT (0, rc);
73+
74+
// Cleanup resources
75+
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&routing_id));
76+
close (fd); // Standard POSIX close as seen in other test files
77+
test_context_socket_close (stream);
78+
}
79+
80+
int main (int, char **)
81+
{
82+
setup_test_environment ();
83+
84+
UNITY_BEGIN ();
85+
RUN_TEST (test_stream_hwm_disconnect);
86+
return UNITY_END ();
87+
}

0 commit comments

Comments
 (0)