1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
|
/* Copyright (c) 2018-2022 Marcelo Zimbres Silva (mzimbres@gmail.com)
*
* Distributed under the Boost Software License, Version 1.0. (See
* accompanying file LICENSE.txt)
*/
#include <boost/redis/connection.hpp>
#include <boost/redis/ignore.hpp>
#include <boost/redis/request.hpp>
#include <boost/redis/response.hpp>
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/core/lightweight_test.hpp>
#include <boost/system/errc.hpp>
#include "common.hpp"
#include <cstddef>
#include <iostream>
using namespace std::chrono_literals;
namespace net = boost::asio;
using error_code = boost::system::error_code;
using boost::redis::operation;
using boost::redis::error;
using boost::redis::request;
using boost::redis::response;
using boost::redis::generic_response;
using boost::redis::ignore;
using boost::redis::ignore_t;
using boost::redis::logger;
using boost::redis::connection;
using namespace std::chrono_literals;
namespace {
// We can cancel requests that haven't been written yet.
// All cancellation types are supported here.
void test_cancel_pending()
{
struct {
const char* name;
net::cancellation_type_t cancel_type;
} test_cases[] = {
{"terminal", net::cancellation_type_t::terminal},
{"partial", net::cancellation_type_t::partial },
{"total", net::cancellation_type_t::total },
};
for (const auto& tc : test_cases) {
std::cerr << "Running test case: " << tc.name << std::endl;
// Setup
net::io_context ctx;
connection conn(ctx);
request req;
req.push("get", "mykey");
// Issue a request without calling async_run(), so the request stays waiting forever
net::cancellation_signal sig;
bool called = false;
conn.async_exec(
req,
ignore,
net::bind_cancellation_slot(sig.slot(), [&](error_code ec, std::size_t sz) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
BOOST_TEST_EQ(sz, 0u);
called = true;
}));
// Issue a cancellation
sig.emit(tc.cancel_type);
// Prevent the test for deadlocking in case of failure
ctx.run_for(test_timeout);
BOOST_TEST(called);
}
}
// We can cancel requests that have been written but which
// responses haven't been received yet.
// Terminal and partial cancellation types are supported here.
void test_cancel_written()
{
// Setup
net::io_context ctx;
connection conn{ctx};
auto cfg = make_test_config();
cfg.health_check_interval = std::chrono::seconds::zero();
bool run_finished = false, exec1_finished = false, exec2_finished = false,
exec3_finished = false;
// Will be cancelled after it has been written but before the
// response arrives. Create everything in dynamic memory to verify
// we don't try to access things after completion.
auto req1 = std::make_unique<request>();
req1->push("BLPOP", "any", 1);
auto r1 = std::make_unique<response<std::string>>();
// Will be cancelled too because it's sent after BLPOP.
// Tests that partial cancellation is supported, too.
request req2;
req2.push("PING", "partial_cancellation");
// Will finish successfully once the response to the BLPOP arrives
request req3;
req3.push("PING", "after_blpop");
response<std::string> r3;
// Run the connection
conn.async_run(cfg, [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// The request will be cancelled before it receives a response.
// Our BLPOP will wait for longer than the timeout we're using.
// Clear allocated memory to check we don't access the request or
// response when the server response arrives.
auto blpop_cb = [&](error_code ec, std::size_t) {
req1.reset();
r1.reset();
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec1_finished = true;
};
conn.async_exec(*req1, *r1, net::cancel_after(500ms, blpop_cb));
// The first PING will be cancelled, too. Use partial cancellation here.
auto req2_cb = [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec2_finished = true;
};
conn.async_exec(
req2,
ignore,
net::cancel_after(500ms, net::cancellation_type_t::partial, req2_cb));
// The second PING's response will be received after the BLPOP's response,
// but it will be processed successfully.
conn.async_exec(req3, r3, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
BOOST_TEST_EQ(std::get<0>(r3).value(), "after_blpop");
conn.cancel();
exec3_finished = true;
});
ctx.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec1_finished);
BOOST_TEST(exec2_finished);
BOOST_TEST(exec3_finished);
}
// Requests configured to do so are cancelled if the connection
// hasn't been established when they are executed
void test_cancel_if_not_connected()
{
net::io_context ioc;
connection conn{ioc};
request req;
req.get_config().cancel_if_not_connected = true;
req.push("PING");
bool exec_finished = false;
conn.async_exec(req, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error::not_connected);
exec_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(exec_finished);
}
// Requests configured to do so are cancelled when the connection is lost.
// Tests with a written request that hasn't been responded yet
void test_cancel_on_connection_lost_written()
{
// Setup
net::io_context ioc;
connection conn{ioc};
// req0 and req1 will be coalesced together. When req0
// completes, we know that req1 will be waiting for a response.
// req1 will block forever.
request req0;
req0.push("PING");
request req1;
req1.get_config().cancel_on_connection_lost = true;
req1.get_config().cancel_if_unresponded = true;
req1.push("BLPOP", "any", 0);
bool run_finished = false, exec0_finished = false, exec1_finished = false;
// Run the connection
auto cfg = make_test_config();
conn.async_run(cfg, [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// Execute both requests
conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) {
// The request finished successfully
BOOST_TEST_EQ(ec, error_code());
exec0_finished = true;
// We know that req1 has been written to the server, too. Trigger a cancellation
conn.cancel(operation::run);
conn.cancel(operation::reconnection);
});
conn.async_exec(req1, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
exec1_finished = true;
});
ioc.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec0_finished);
BOOST_TEST(exec1_finished);
}
// connection::cancel(operation::exec) works. Pending requests are cancelled,
// but written requests are not
void test_cancel_operation_exec()
{
// Setup
net::io_context ctx;
connection conn{ctx};
bool run_finished = false, exec0_finished = false, exec1_finished = false,
exec2_finished = false;
request req0;
req0.push("PING", "before_blpop");
request req1;
req1.push("BLPOP", "any", 1);
generic_response r1;
request req2;
req2.push("PING", "after_blpop");
// Run the connection
conn.async_run(make_test_config(), [&](error_code ec) {
BOOST_TEST_EQ(ec, net::error::operation_aborted);
run_finished = true;
});
// Execute req0 and req1. They will be coalesced together.
// When req0 completes, we know that req1 will be waiting its response
conn.async_exec(req0, ignore, [&](error_code ec, std::size_t) {
BOOST_TEST_EQ(ec, error_code());
exec0_finished = true;
conn.cancel(operation::exec);
});
// By default, ignore will issue an error when a NULL is received.
// ATM, this causes the connection to be torn down. Using a generic_response avoids this.
// See https://github.com/boostorg/redis/issues/314
conn.async_exec(req1, r1, [&](error_code ec, std::size_t) {
// No error should occur since the cancellation should be ignored
std::cout << "async_exec (1): " << ec.message() << std::endl;
BOOST_TEST_EQ(ec, error_code());
exec1_finished = true;
// The connection remains usable
conn.async_exec(req2, ignore, [&](error_code ec2, std::size_t) {
BOOST_TEST_EQ(ec2, error_code());
exec2_finished = true;
conn.cancel();
});
});
ctx.run_for(test_timeout);
BOOST_TEST(run_finished);
BOOST_TEST(exec0_finished);
BOOST_TEST(exec1_finished);
BOOST_TEST(exec2_finished);
}
} // namespace
int main()
{
test_cancel_pending();
test_cancel_written();
test_cancel_if_not_connected();
test_cancel_on_connection_lost_written();
test_cancel_operation_exec();
return boost::report_errors();
}
|