File: stream_unpack.cpp

package info (click to toggle)
msgpack-cxx 7.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,520 kB
  • sloc: cpp: 87,413; ansic: 3,571; sh: 56; makefile: 39
file content (248 lines) | stat: -rw-r--r-- 7,184 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// MessagePack for C++ example
//
// Copyright (C) 2017 KONDO Takatoshi
//
//    Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//    http://www.boost.org/LICENSE_1_0.txt)
//

#include <iostream>
#include <sstream>
#include <cassert>
#include <thread>

// MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
// It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.

//#define MSGPACK_USE_X3_PARSE

#include <msgpack.hpp>

#include <boost/asio.hpp>
#include <boost/coroutine2/all.hpp>

#if defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#endif // defined(__clang__)

#include <boost/spirit/home/support/multi_pass.hpp>

#if defined(__clang__)
#pragma GCC diagnostic pop
#endif // defined(__clang__)

namespace as = boost::asio;
namespace x3 = boost::spirit::x3;
namespace coro2 = boost::coroutines2;

using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;

// iterator fetching data from coroutine2.
class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
public:
    using pointer_t = typename iterator::pointer;
    using reference_t = typename iterator::reference;

    explicit buffered_iterator(pull_type& source) noexcept
        : source_{ &source } {
        fetch_();
    }
    buffered_iterator() = default;

    bool operator==(buffered_iterator const& other) const noexcept {
        if (!other.source_ && !source_ && !other.buf_ && !buf_) return true;
        return other.it_ == it_;
    }

    bool operator!=(buffered_iterator const& other) const noexcept {
        return !(other == *this);
    }

    buffered_iterator & operator++() {
        increment_();
        return * this;
    }

    buffered_iterator operator++(int) = delete;

    reference_t operator*() noexcept {
        return *it_;
    }

    pointer_t operator->() noexcept {
        return std::addressof(*it_);
    }

private:
    void fetch_() noexcept {
        BOOST_ASSERT( nullptr != source_);
        if (*source_) {
            buf_ = source_->get();
            it_ = buf_->begin();
        }
        else {
            source_ = nullptr;
            buf_.reset();
        }
    }

    void increment_() {
        BOOST_ASSERT( nullptr != source_);
        BOOST_ASSERT(*source_);
        if (++it_ == buf_->end()) {
            (*source_)();
            fetch_();
        }
    }

private:
    pull_type* source_{ nullptr };
    std::shared_ptr<std::vector<char>> buf_;
    std::vector<char>::iterator it_;
};

// session class that corresponding to each client
class session : public std::enable_shared_from_this<session> {
public:
    session(as::ip::tcp::socket socket)
        : socket_(std::move(socket)) {
    }

    void start() {
        sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
            [&, this](pull_type& source) {
                // *1 is started when the first sink is called.

                std::cout << "session started" << std::endl;
                do_read();
                source();

                // use buffered_iterator here
                // b is incremented in msgpack::unpack() and fetch data from sink
                // via coroutine2 mechanism
                auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
                auto e = boost::spirit::make_default_multi_pass(buffered_iterator());

                // This is usually an infinity look, but for test, loop is finished when
                // two message pack data is processed.
                for (int i = 0; i != 2; ++i) {
                    auto oh = msgpack::unpack(b, e);
                    std::cout << oh.get() << std::endl;
                }
            }
        );
        // send dummy data to start *1
        (*sink_)({});
    }

private:
    void do_read() {
        std::cout << "session do_read() is called" << std::endl;
        auto self(shared_from_this());
        auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
        socket_.async_read_some(
            boost::asio::buffer(*data),
            [this, self, data]
            (boost::system::error_code ec, std::size_t length) {
                if (!ec) {
                    data->resize(length);
                    (*sink_)(data);
                    do_read();
                }
            }
        );
    }

    as::ip::tcp::socket socket_;
    static constexpr std::size_t const max_length = 1024;
    std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
};

class server {
public:
    server(
        as::io_service& ios,
        std::uint16_t port)
        : acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
          socket_(ios) {
        do_accept();
        std::cout << "server start accept" << std::endl;
        ios.run();
    }

private:
    void do_accept() {
        acceptor_.async_accept(
            socket_,
            [this](boost::system::error_code ec) {
                if (!ec) {
                    std::make_shared<session>(std::move(socket_))->start();
                }
                // for test, only one session is accepted.
                // do_accept();
            }
        );
    }

    as::ip::tcp::acceptor acceptor_;
    as::ip::tcp::socket socket_;
};

int main() {
    std::thread srv(
        []{
            boost::asio::io_service ios;
            server s(ios, 12345);
        }
    );

    std::thread cli(
        []{
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << "client start" << std::endl;

            std::stringstream ss;
            std::map<std::string, std::vector<int>> v1 {
                { "ABC", { 1, 2, 3 } },
                { "DEFG", { 4, 5 } }
            };
            std::vector<std::string> v2 {
                "HIJ", "KLM", "NOP"
                    };
            msgpack::pack(ss, v1);
            msgpack::pack(ss, v2);

            auto send_data = ss.str();

            boost::asio::io_service ios;
            as::ip::tcp::resolver::query q("127.0.0.1", "12345");
            as::ip::tcp::resolver r(ios);
            auto it = r.resolve(q);

            std::cout << "client connect" << std::endl;
            as::ip::tcp::socket s(ios);
            as::connect(s, it);


            std::size_t const size = 5;
            std::size_t rest = send_data.size();
            std::size_t index = 0;
            while (rest != 0) {
                std::cout << "client send data" << std::endl;
                auto send_size = size < rest ? size : rest;
                as::write(s, as::buffer(&send_data[index], send_size));
                rest -= send_size;
                index += send_size;
                std::cout << "client wait" << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
        }
    );

    cli.join();
    std::cout << "client joinded" << std::endl;
    srv.join();
    std::cout << "server joinded" << std::endl;
}