File: test_spec_router.cpp

package info (click to toggle)
zeromq3 4.3.5-1
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 6,548 kB
  • sloc: cpp: 56,475; ansic: 4,968; makefile: 1,607; sh: 1,400; xml: 196; python: 40
file content (167 lines) | stat: -rw-r--r-- 5,514 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/* SPDX-License-Identifier: MPL-2.0 */

#include "testutil.hpp"
#include "testutil_unity.hpp"

#include <stdlib.h>
#include <string.h>

SETUP_TEARDOWN_TESTCONTEXT

// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
void test_fair_queue_in (const char *bind_address_)
{
    char connect_address[MAX_SOCKET_STRING];
    void *receiver = test_context_socket (ZMQ_ROUTER);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (receiver, bind_address_));
    size_t len = MAX_SOCKET_STRING;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (receiver, ZMQ_LAST_ENDPOINT, connect_address, &len));

    const unsigned char services = 5;
    void *senders[services];
    for (unsigned char peer = 0; peer < services; ++peer) {
        senders[peer] = test_context_socket (ZMQ_DEALER);

        char *str = strdup ("A");
        str[0] += peer;
        TEST_ASSERT_SUCCESS_ERRNO (
          zmq_setsockopt (senders[peer], ZMQ_ROUTING_ID, str, 2));
        free (str);

        TEST_ASSERT_SUCCESS_ERRNO (
          zmq_connect (senders[peer], connect_address));
    }

    msleep (SETTLE_TIME);

    zmq_msg_t msg;
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));

    s_send_seq (senders[0], "M", SEQ_END);
    s_recv_seq (receiver, "A", "M", SEQ_END);

    s_send_seq (senders[0], "M", SEQ_END);
    s_recv_seq (receiver, "A", "M", SEQ_END);

    int sum = 0;

    // send N requests
    for (unsigned char peer = 0; peer < services; ++peer) {
        s_send_seq (senders[peer], "M", SEQ_END);
        sum += 'A' + peer;
    }

    TEST_ASSERT_EQUAL_INT (services * 'A' + services * (services - 1) / 2, sum);

    // handle N requests
    for (unsigned char peer = 0; peer < services; ++peer) {
        TEST_ASSERT_EQUAL_INT (
          2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, receiver, 0)));
        const char *id = static_cast<const char *> (zmq_msg_data (&msg));
        sum -= id[0];

        s_recv_seq (receiver, "M", SEQ_END);
    }

    TEST_ASSERT_EQUAL_INT (0, sum);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

    test_context_socket_close_zero_linger (receiver);

    for (size_t peer = 0; peer < services; ++peer)
        test_context_socket_close_zero_linger (senders[peer]);

    // Wait for disconnects.
    msleep (SETTLE_TIME);
}

// SHALL create a double queue when a peer connects to it. If this peer
// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
// discard any messages it contains.
void test_destroy_queue_on_disconnect (const char *bind_address_)
{
    void *a = test_context_socket (ZMQ_ROUTER);

    int enabled = 1;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (a, ZMQ_ROUTER_MANDATORY, &enabled, sizeof (enabled)));

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
    size_t len = MAX_SOCKET_STRING;
    char connect_address[MAX_SOCKET_STRING];
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len));

    void *b = test_context_socket (ZMQ_DEALER);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (b, ZMQ_ROUTING_ID, "B", 2));

    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));

    // Wait for connection.
    msleep (SETTLE_TIME);

    // Send a message in both directions
    s_send_seq (a, "B", "ABC", SEQ_END);
    s_send_seq (b, "DEF", SEQ_END);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));

    // Disconnect may take time and need command processing.
    zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
    TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));

    // No messages should be available, sending should fail.
    zmq_msg_t msg;
    zmq_msg_init (&msg);

    TEST_ASSERT_FAILURE_ERRNO (
      EHOSTUNREACH, zmq_send (a, "B", 2, ZMQ_SNDMORE | ZMQ_DONTWAIT));

    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));

    // After a reconnect of B, the messages should still be gone
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));

    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));

    TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));

    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));

    test_context_socket_close_zero_linger (a);
    test_context_socket_close_zero_linger (b);

    // Wait for disconnects.
    msleep (SETTLE_TIME);
}

#define TEST_SUITE(name, bind_address)                                         \
    void test_fair_queue_in_##name ()                                          \
    {                                                                          \
        test_fair_queue_in (bind_address);                                     \
    }                                                                          \
    void test_destroy_queue_on_disconnect_##name ()                            \
    {                                                                          \
        test_destroy_queue_on_disconnect (bind_address);                       \
    }

TEST_SUITE (inproc, "inproc://a")
TEST_SUITE (tcp, "tcp://127.0.0.1:*")

int main ()
{
    setup_test_environment ();

    UNITY_BEGIN ();
    RUN_TEST (test_fair_queue_in_tcp);
    RUN_TEST (test_fair_queue_in_inproc);
    // TODO commented out until libzmq implements this properly
    // RUN_TEST (test_destroy_queue_on_disconnect_tcp);
    // RUN_TEST (test_destroy_queue_on_disconnect_inproc);
    return UNITY_END ();
}