1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
// ---------------------------------------------------------------------
// pion: a Boost C++ framework for building lightweight HTTP interfaces
// ---------------------------------------------------------------------
// Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion)
//
// Distributed under the Boost Software License, Version 1.0.
// See http://www.boost.org/LICENSE_1_0.txt
//
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include <boost/test/unit_test.hpp>
#include <pion/config.hpp>
#include <pion/scheduler.hpp>
#include <pion/tcp/stream.hpp>
using namespace std;
using namespace pion;
///
/// tcp_stream_tests_F: fixture used for performing tcp::stream tests
///
class tcp_stream_tests_F {
public:
/// data type for a function that handles tcp::stream connections
typedef boost::function1<void,tcp::stream&> connection_handler;
// default constructor and destructor
tcp_stream_tests_F() {
}
virtual ~tcp_stream_tests_F() {}
/**
* listen for a TCP connection and call the connection handler when connected
*
* @param conn_handler function to call after a connection is established
*/
void acceptConnection(connection_handler conn_handler) {
// configure the acceptor service
boost::asio::ip::tcp::acceptor tcp_acceptor(m_scheduler.get_io_service());
boost::asio::ip::tcp::endpoint tcp_endpoint(boost::asio::ip::tcp::v4(), 0);
tcp_acceptor.open(tcp_endpoint.protocol());
// allow the acceptor to reuse the address (i.e. SO_REUSEADDR)
tcp_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
tcp_acceptor.bind(tcp_endpoint);
tcp_acceptor.listen();
// notify test thread that we are ready to accept a connection
{
// wait for test thread to be waiting on the signal
boost::unique_lock<boost::mutex> accept_lock(m_accept_mutex);
// trigger signal to wake test thread
m_port = tcp_acceptor.local_endpoint().port();
m_accept_ready.notify_one();
}
// schedule another thread to listen for a TCP connection
tcp::stream listener_stream(m_scheduler.get_io_service());
boost::system::error_code ec = listener_stream.accept(tcp_acceptor);
tcp_acceptor.close();
BOOST_REQUIRE(! ec);
// call the connection handler
conn_handler(listener_stream);
}
/// sends a "Hello" to a tcp::stream
static void sendHello(tcp::stream& str) {
str << "Hello" << std::endl;
str.flush();
}
/// port where acceptor listens
int m_port;
/// used to schedule work across multiple threads
single_service_scheduler m_scheduler;
/// used to notify test thread when acceptConnection() is ready
boost::condition m_accept_ready;
/// used to sync test thread with acceptConnection()
boost::mutex m_accept_mutex;
};
// tcp::stream Test Cases
BOOST_FIXTURE_TEST_SUITE(tcp_stream_tests_S, tcp_stream_tests_F)
BOOST_AUTO_TEST_CASE(checkTCPConnectToAnotherStream) {
boost::unique_lock<boost::mutex> accept_lock(m_accept_mutex);
// schedule another thread to listen for a TCP connection
connection_handler conn_handler(boost::bind(&tcp_stream_tests_F::sendHello, _1));
boost::thread listener_thread(boost::bind(&tcp_stream_tests_F::acceptConnection,
this, conn_handler) );
m_scheduler.add_active_user();
m_accept_ready.wait(accept_lock);
// connect to the listener
tcp::stream client_str(m_scheduler.get_io_service());
boost::system::error_code ec;
ec = client_str.connect(boost::asio::ip::address::from_string("127.0.0.1"), m_port);
BOOST_REQUIRE(! ec);
// get the hello message
std::string response_msg;
client_str >> response_msg;
BOOST_CHECK_EQUAL(response_msg, "Hello");
client_str.close();
listener_thread.join();
m_scheduler.remove_active_user();
}
BOOST_AUTO_TEST_SUITE_END()
#define BIG_BUF_SIZE (12 * 1024)
///
/// tcp_stream_buffer_tests_F: fixture that includes a big data buffer used for tests
///
class tcp_stream_buffer_tests_F
: public tcp_stream_tests_F
{
public:
// default constructor and destructor
tcp_stream_buffer_tests_F() {
// fill the buffer with non-random characters
for (unsigned long n = 0; n < BIG_BUF_SIZE; ++n) {
m_big_buf[n] = char(n);
}
}
virtual ~tcp_stream_buffer_tests_F() {}
/// sends the big buffer contents to a tcp::stream
void sendBigBuffer(tcp::stream& str) {
str.write(m_big_buf, BIG_BUF_SIZE);
str.flush();
}
/// big data buffer used for the tests
char m_big_buf[BIG_BUF_SIZE];
};
BOOST_FIXTURE_TEST_SUITE(tcp_stream_buffer_tests_S, tcp_stream_buffer_tests_F)
BOOST_AUTO_TEST_CASE(checkSendAndReceiveBiggerThanBuffers) {
boost::unique_lock<boost::mutex> accept_lock(m_accept_mutex);
// schedule another thread to listen for a TCP connection
connection_handler conn_handler(boost::bind(&tcp_stream_buffer_tests_F::sendBigBuffer, this, _1));
boost::thread listener_thread(boost::bind(&tcp_stream_buffer_tests_F::acceptConnection,
this, conn_handler) );
m_scheduler.add_active_user();
m_accept_ready.wait(accept_lock);
// connect to the listener
tcp::stream client_str(m_scheduler.get_io_service());
boost::system::error_code ec;
ec = client_str.connect(boost::asio::ip::address::from_string("127.0.0.1"), m_port);
BOOST_REQUIRE(! ec);
// read the big buffer contents
char another_buf[BIG_BUF_SIZE];
BOOST_REQUIRE(client_str.read(another_buf, BIG_BUF_SIZE));
BOOST_CHECK_EQUAL(memcmp(m_big_buf, another_buf, BIG_BUF_SIZE), 0);
client_str.close();
listener_thread.join();
m_scheduler.remove_active_user();
}
BOOST_AUTO_TEST_SUITE_END()
|