1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
/* SPDX-License-Identifier: MPL-2.0 */
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <stdlib.h>
SETUP_TEARDOWN_TESTCONTEXT
char connect_address[MAX_SOCKET_STRING];
void test_fair_queue_in (const char *bind_address_)
{
void *rep = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address_));
size_t len = MAX_SOCKET_STRING;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
const size_t services = 5;
void *reqs[services];
for (size_t peer = 0; peer < services; ++peer) {
reqs[peer] = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (reqs[peer], connect_address));
}
msleep (SETTLE_TIME);
s_send_seq (reqs[0], "A", SEQ_END);
s_recv_seq (rep, "A", SEQ_END);
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (reqs[0], "A", SEQ_END);
s_send_seq (reqs[0], "A", SEQ_END);
s_recv_seq (rep, "A", SEQ_END);
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (reqs[0], "A", SEQ_END);
// TODO: following test fails randomly on some boxes
#ifdef SOMEONE_FIXES_THIS
// send N requests
for (size_t peer = 0; peer < services; ++peer) {
char *str = strdup ("A");
str[0] += peer;
s_send_seq (reqs[peer], str, SEQ_END);
free (str);
}
// handle N requests
for (size_t peer = 0; peer < services; ++peer) {
char *str = strdup ("A");
str[0] += peer;
// Test fails here
s_recv_seq (rep, str, SEQ_END);
s_send_seq (rep, str, SEQ_END);
s_recv_seq (reqs[peer], str, SEQ_END);
free (str);
}
#endif
test_context_socket_close_zero_linger (rep);
for (size_t peer = 0; peer < services; ++peer)
test_context_socket_close_zero_linger (reqs[peer]);
}
void test_envelope (const char *bind_address_)
{
void *rep = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (rep, bind_address_));
size_t len = MAX_SOCKET_STRING;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (rep, ZMQ_LAST_ENDPOINT, connect_address, &len));
void *dealer = test_context_socket (ZMQ_DEALER);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address));
// minimal envelope
s_send_seq (dealer, 0, "A", SEQ_END);
s_recv_seq (rep, "A", SEQ_END);
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (dealer, 0, "A", SEQ_END);
// big envelope
s_send_seq (dealer, "X", "Y", 0, "A", SEQ_END);
s_recv_seq (rep, "A", SEQ_END);
s_send_seq (rep, "A", SEQ_END);
s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END);
test_context_socket_close_zero_linger (rep);
test_context_socket_close_zero_linger (dealer);
}
const char bind_inproc[] = "inproc://a";
const char bind_tcp[] = "tcp://127.0.0.1:*";
void test_fair_queue_in_inproc ()
{
test_fair_queue_in (bind_inproc);
}
void test_fair_queue_in_tcp ()
{
test_fair_queue_in (bind_tcp);
}
void test_envelope_inproc ()
{
test_envelope (bind_inproc);
}
void test_envelope_tcp ()
{
test_envelope (bind_tcp);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
// SHALL receive incoming messages from its peers using a fair-queuing
// strategy.
RUN_TEST (test_fair_queue_in_inproc);
RUN_TEST (test_fair_queue_in_tcp);
// For an incoming message:
// SHALL remove and store the address envelope, including the delimiter.
// SHALL pass the remaining data frames to its calling application.
// SHALL wait for a single reply message from its calling application.
// SHALL prepend the address envelope and delimiter.
// SHALL deliver this message back to the originating peer.
RUN_TEST (test_envelope_inproc);
RUN_TEST (test_envelope_tcp);
return UNITY_END ();
}
|