1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <cmath>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <set>
#include "protocol.hpp"
using boost::asio::ip::tcp;
using boost::asio::ip::udp;
typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
typedef boost::shared_ptr<boost::asio::steady_timer> timer_ptr;
typedef boost::shared_ptr<control_request> control_request_ptr;
class server
{
public:
// Construct the server to wait for incoming control connections.
server(boost::asio::io_context& io_context, unsigned short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
timer_(io_context),
udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
next_frame_number_(1)
{
// Start waiting for a new control connection.
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error, new_socket));
// Start the timer used to generate outgoing frames.
timer_.expires_after(boost::asio::chrono::milliseconds(100));
timer_.async_wait(boost::bind(&server::handle_timer, this));
}
// Handle a new control connection.
void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
{
if (!ec)
{
// Start receiving control requests on the connection.
control_request_ptr request(new control_request);
boost::asio::async_read(*socket, request->to_buffers(),
boost::bind(&server::handle_control_request, this,
boost::asio::placeholders::error, socket, request));
}
// Start waiting for a new control connection.
tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
acceptor_.async_accept(*new_socket,
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error, new_socket));
}
// Handle a new control request.
void handle_control_request(const boost::system::error_code& ec,
tcp_socket_ptr socket, control_request_ptr request)
{
if (!ec)
{
// Delay handling of the control request to simulate network latency.
timer_ptr delay_timer(
new boost::asio::steady_timer(acceptor_.get_executor()));
delay_timer->expires_after(boost::asio::chrono::seconds(2));
delay_timer->async_wait(
boost::bind(&server::handle_control_request_timer, this,
socket, request, delay_timer));
}
}
void handle_control_request_timer(tcp_socket_ptr socket,
control_request_ptr request, timer_ptr /*delay_timer*/)
{
// Determine what address this client is connected from, since
// subscriptions must be stored on the server as a complete endpoint, not
// just a port. We use the non-throwing overload of remote_endpoint() since
// it may fail if the socket is no longer connected.
boost::system::error_code ec;
tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
if (!ec)
{
// Remove old port subscription, if any.
if (unsigned short old_port = request->old_port())
{
udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
subscribers_.erase(old_endpoint);
std::cout << "Removing subscription " << old_endpoint << std::endl;
}
// Add new port subscription, if any.
if (unsigned short new_port = request->new_port())
{
udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
subscribers_.insert(new_endpoint);
std::cout << "Adding subscription " << new_endpoint << std::endl;
}
}
// Wait for next control request on this connection.
boost::asio::async_read(*socket, request->to_buffers(),
boost::bind(&server::handle_control_request, this,
boost::asio::placeholders::error, socket, request));
}
// Every time the timer fires we will generate a new frame and send it to all
// subscribers.
void handle_timer()
{
// Generate payload.
double x = next_frame_number_ * 0.2;
double y = std::sin(x);
int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
std::string payload;
for (int i = 0; i < frame::payload_size; ++i)
payload += (i == char_index ? '*' : '.');
// Create the frame to be sent to all subscribers.
frame f(next_frame_number_++, payload);
// Send frame to all subscribers. We can use synchronous calls here since
// UDP send operations typically do not block.
std::set<udp::endpoint>::iterator j;
for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
{
boost::system::error_code ec;
udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
}
// Wait for next timeout.
timer_.expires_after(boost::asio::chrono::milliseconds(100));
timer_.async_wait(boost::bind(&server::handle_timer, this));
}
private:
// The acceptor used to accept incoming control connections.
tcp::acceptor acceptor_;
// The timer used for generating data.
boost::asio::steady_timer timer_;
// The socket used to send data to subscribers.
udp::socket udp_socket_;
// The next frame number.
unsigned long next_frame_number_;
// The set of endpoints that are subscribed.
std::set<udp::endpoint> subscribers_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: server <port>\n";
return 1;
}
boost::asio::io_context io_context;
using namespace std; // For atoi.
server s(io_context, atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << std::endl;
}
return 0;
}
|