File: test_conn_echo_stress.cpp

package info (click to toggle)
boost1.90 1.90.0-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 593,120 kB
  • sloc: cpp: 4,190,908; xml: 196,648; python: 34,618; ansic: 23,145; asm: 5,468; sh: 3,774; makefile: 1,161; perl: 1,020; sql: 728; ruby: 676; yacc: 478; java: 77; lisp: 24; csh: 6
file content (154 lines) | stat: -rw-r--r-- 4,169 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
 *
 * Distributed under the Boost Software License, Version 1.0. (See
 * accompanying file LICENSE.txt)
 */

#include <boost/redis/connection.hpp>
#include <boost/redis/logger.hpp>

#include <boost/asio/co_spawn.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>

#include <cstddef>
#include <exception>
#define BOOST_TEST_MODULE echo_stress
#include <boost/test/included/unit_test.hpp>

#include "common.hpp"

#include <iostream>

#ifdef BOOST_ASIO_HAS_CO_AWAIT

namespace net = boost::asio;
using error_code = boost::system::error_code;
using boost::redis::operation;
using boost::redis::request;
using boost::redis::response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::logger;
using boost::redis::connection;
using boost::redis::usage;
using boost::redis::error;
using namespace std::chrono_literals;

namespace boost::redis {

std::ostream& operator<<(std::ostream& os, usage const& u)
{
   os << "Commands sent: " << u.commands_sent << "\n"
      << "Bytes sent: " << u.bytes_sent << "\n"
      << "Responses received: " << u.responses_received << "\n"
      << "Pushes received: " << u.pushes_received << "\n"
      << "Bytes received (response): " << u.response_bytes_received << "\n"
      << "Bytes received (push): " << u.push_bytes_received << "\n"
      << "Bytes rotated: " << u.bytes_rotated;

   return os;
}

}  // namespace boost::redis

namespace {

auto push_consumer(connection& conn, int expected) -> net::awaitable<void>
{
   int c = 0;
   for (error_code ec;;) {
      conn.receive(ec);
      if (ec == error::sync_receive_push_failed) {
         ec = {};
         co_await conn.async_receive(net::redirect_error(ec));
      } else if (!ec) {
         //std::cout << "Skipping suspension." << std::endl;
      }

      if (ec) {
         BOOST_TEST(false, "push_consumer error: " << ec.message());
         co_return;
      }
      if (++c == expected)
         break;
   }

   conn.cancel();
}

auto echo_session(connection& conn, const request& pubs, int n) -> net::awaitable<void>
{
   for (auto i = 0; i < n; ++i)
      co_await conn.async_exec(pubs);
}

void rethrow_on_error(std::exception_ptr exc)
{
   if (exc)
      std::rethrow_exception(exc);
}

BOOST_AUTO_TEST_CASE(echo_stress)
{
   // Setup
   net::io_context ctx;
   connection conn{ctx};
   auto cfg = make_test_config();

   // Number of coroutines that will send pings sharing the same
   // connection to redis.
   constexpr int sessions = 150;

   // The number of pings that will be sent by each session.
   constexpr int msgs = 200;

   // The number of publishes that will be sent by each session with
   // each message.
   constexpr int n_pubs = 25;

   // This is the total number of pushes we will receive.
   constexpr int total_pushes = sessions * msgs * n_pubs + 1;

   request pubs;
   pubs.push("PING");
   for (int i = 0; i < n_pubs; ++i)
      pubs.push("PUBLISH", "channel", "payload");

   // Run the connection
   bool run_finished = false, subscribe_finished = false;
   conn.async_run(cfg, logger{logger::level::crit}, [&run_finished](error_code ec) {
      run_finished = true;
      BOOST_TEST(ec == net::error::operation_aborted);
      std::clog << "async_run finished" << std::endl;
   });

   // Subscribe, then launch the coroutines
   request req;
   req.push("SUBSCRIBE", "channel");
   conn.async_exec(req, ignore, [&](error_code ec, std::size_t) {
      subscribe_finished = true;
      BOOST_TEST(ec == error_code());

      // Op that will consume the pushes counting down until all expected
      // pushes have been received.
      net::co_spawn(ctx, push_consumer(conn, total_pushes), rethrow_on_error);

      for (int i = 0; i < sessions; ++i)
         net::co_spawn(ctx, echo_session(conn, pubs, msgs), rethrow_on_error);
   });

   // Run the test
   ctx.run_for(2 * test_timeout);
   BOOST_TEST(run_finished);
   BOOST_TEST(subscribe_finished);

   // Print statistics
   std::cout << "-------------------\n" << conn.get_usage() << std::endl;
}

}  // namespace

#else
BOOST_AUTO_TEST_CASE(dummy) { }
#endif