1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
// MessagePack for C++ example
//
// Copyright (C) 2017 KONDO Takatoshi
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
#include <iostream>
#include <sstream>
#include <cassert>
#include <thread>
// MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
// It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.
//#define MSGPACK_USE_X3_PARSE
#include <msgpack.hpp>
#include <boost/asio.hpp>
#include <boost/coroutine2/all.hpp>
#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif // defined(__clang__)
#include <boost/spirit/home/support/multi_pass.hpp>
#if defined(__clang__)
#pragma GCC diagnostic pop
#endif // defined(__clang__)
namespace as = boost::asio;
namespace x3 = boost::spirit::x3;
namespace coro2 = boost::coroutines2;
using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;
// iterator fetching data from coroutine2.
class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
public:
using pointer_t = typename iterator::pointer;
using reference_t = typename iterator::reference;
explicit buffered_iterator(pull_type& source) noexcept
: source_{ &source } {
fetch_();
}
buffered_iterator() = default;
bool operator==(buffered_iterator const& other) const noexcept {
if (!other.source_ && !source_ && !other.buf_ && !buf_) return true;
return other.it_ == it_;
}
bool operator!=(buffered_iterator const& other) const noexcept {
return !(other == *this);
}
buffered_iterator & operator++() {
increment_();
return * this;
}
buffered_iterator operator++(int) = delete;
reference_t operator*() noexcept {
return *it_;
}
pointer_t operator->() noexcept {
return std::addressof(*it_);
}
private:
void fetch_() noexcept {
BOOST_ASSERT( nullptr != source_);
if (*source_) {
buf_ = source_->get();
it_ = buf_->begin();
}
else {
source_ = nullptr;
buf_.reset();
}
}
void increment_() {
BOOST_ASSERT( nullptr != source_);
BOOST_ASSERT(*source_);
if (++it_ == buf_->end()) {
(*source_)();
fetch_();
}
}
private:
pull_type* source_{ nullptr };
std::shared_ptr<std::vector<char>> buf_;
std::vector<char>::iterator it_;
};
// session class that corresponding to each client
class session : public std::enable_shared_from_this<session> {
public:
session(as::ip::tcp::socket socket)
: socket_(std::move(socket)) {
}
void start() {
sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
[&, this](pull_type& source) {
// *1 is started when the first sink is called.
std::cout << "session started" << std::endl;
do_read();
source();
// use buffered_iterator here
// b is incremented in msgpack::unpack() and fetch data from sink
// via coroutine2 mechanism
auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
auto e = boost::spirit::make_default_multi_pass(buffered_iterator());
// This is usually an infinity look, but for test, loop is finished when
// two message pack data is processed.
for (int i = 0; i != 2; ++i) {
auto oh = msgpack::unpack(b, e);
std::cout << oh.get() << std::endl;
}
}
);
// send dummy data to start *1
(*sink_)({});
}
private:
void do_read() {
std::cout << "session do_read() is called" << std::endl;
auto self(shared_from_this());
auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
socket_.async_read_some(
boost::asio::buffer(*data),
[this, self, data]
(boost::system::error_code ec, std::size_t length) {
if (!ec) {
data->resize(length);
(*sink_)(data);
do_read();
}
}
);
}
as::ip::tcp::socket socket_;
static constexpr std::size_t const max_length = 1024;
std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
};
class server {
public:
server(
as::io_service& ios,
std::uint16_t port)
: acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
socket_(ios) {
do_accept();
std::cout << "server start accept" << std::endl;
ios.run();
}
private:
void do_accept() {
acceptor_.async_accept(
socket_,
[this](boost::system::error_code ec) {
if (!ec) {
std::make_shared<session>(std::move(socket_))->start();
}
// for test, only one session is accepted.
// do_accept();
}
);
}
as::ip::tcp::acceptor acceptor_;
as::ip::tcp::socket socket_;
};
int main() {
std::thread srv(
[]{
boost::asio::io_service ios;
server s(ios, 12345);
}
);
std::thread cli(
[]{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "client start" << std::endl;
std::stringstream ss;
std::map<std::string, std::vector<int>> v1 {
{ "ABC", { 1, 2, 3 } },
{ "DEFG", { 4, 5 } }
};
std::vector<std::string> v2 {
"HIJ", "KLM", "NOP"
};
msgpack::pack(ss, v1);
msgpack::pack(ss, v2);
auto send_data = ss.str();
boost::asio::io_service ios;
as::ip::tcp::resolver::query q("127.0.0.1", "12345");
as::ip::tcp::resolver r(ios);
auto it = r.resolve(q);
std::cout << "client connect" << std::endl;
as::ip::tcp::socket s(ios);
as::connect(s, it);
std::size_t const size = 5;
std::size_t rest = send_data.size();
std::size_t index = 0;
while (rest != 0) {
std::cout << "client send data" << std::endl;
auto send_size = size < rest ? size : rest;
as::write(s, as::buffer(&send_data[index], send_size));
rest -= send_size;
index += send_size;
std::cout << "client wait" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
);
cli.join();
std::cout << "client joinded" << std::endl;
srv.join();
std::cout << "server joinded" << std::endl;
}
|