File: client_connection.hpp

package info (click to toggle)
snapcast 0.34.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,252 kB
  • sloc: cpp: 40,067; python: 3,260; sh: 455; makefile: 16
file content (294 lines) | stat: -rw-r--r-- 9,731 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/***
    This file is part of snapcast
    Copyright (C) 2014-2025 Johannes Pohl

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
***/

#pragma once

// local headers
#include "client_settings.hpp"
#include "common/message/factory.hpp"
#include "common/message/message.hpp"
#include "common/time_defs.hpp"

// 3rd party headers
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/beast/core.hpp>
#ifdef HAS_OPENSSL
#include <boost/beast/ssl.hpp>
#endif
#include <boost/beast/websocket.hpp>

// standard headers
#include <deque>
#include <memory>
#include <mutex>
#include <optional>
#include <string>


// using boost::asio::ip::tcp;
namespace beast = boost::beast;         // from <boost/beast.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
using tcp_socket = boost::asio::ip::tcp::socket;
using tcp_websocket = websocket::stream<tcp_socket>;
#ifdef HAS_OPENSSL
using ssl_socket = boost::asio::ssl::stream<tcp_socket>;
using ssl_websocket = websocket::stream<ssl_socket>;
#endif

class ClientConnection;

template <typename Message>
using MessageHandler = std::function<void(const boost::system::error_code&, std::unique_ptr<Message>)>;

/// Used to synchronize server requests (wait for server response)
class PendingRequest : public std::enable_shared_from_this<PendingRequest>
{
public:
    /// c'tor
    PendingRequest(const boost::asio::strand<boost::asio::any_io_executor>& strand, uint16_t reqId, const MessageHandler<msg::BaseMessage>& handler);
    /// d'tor
    virtual ~PendingRequest();

    /// Set the response for the pending request and passes it to the handler
    /// @param value the response message
    void setValue(std::unique_ptr<msg::BaseMessage> value);

    /// @return the id of the request
    uint16_t id() const;

    /// Start the timer for the request
    /// @param timeout the timeout to wait for the reception of the response
    void startTimer(const chronos::usec& timeout);

    /// Needed to put the requests in a container
    bool operator<(const PendingRequest& other) const;

    /// Cancel the request
    void cancel();

private:
    uint16_t id_;
    boost::asio::steady_timer timer_;
    boost::asio::strand<boost::asio::any_io_executor> strand_;
    MessageHandler<msg::BaseMessage> handler_;
};



/// Endpoint of the server connection
/**
 * Server connection endpoint.
 * Messages are sent to the server with the "send" method (async).
 * Messages are sent sync to server with the sendReq method.
 */
class ClientConnection
{
public:
    /// Result callback with boost::error_code
    using ResultHandler = std::function<void(const boost::system::error_code&)>;
    /// Result callback of a write operation
    using WriteHandler = std::function<void(boost::system::error_code ec, std::size_t length)>;

    /// c'tor
    ClientConnection(boost::asio::io_context& io_context, ClientSettings::Server server);
    /// d'tor
    virtual ~ClientConnection() = default;

    /// async connect
    /// @param handler async result handler
    void connect(const ResultHandler& handler);
    /// disconnect the socket
    virtual void disconnect() = 0;

    /// async send a message
    /// @param message the message
    /// @param handler the result handler
    void send(const msg::message_ptr& message, const ResultHandler& handler);

    /// Send request to the server and wait for answer
    /// @param message the message
    /// @param timeout the send timeout
    /// @param handler async result handler with the response message or error
    void sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<msg::BaseMessage>& handler);

    /// @sa sendRequest with templated response message
    template <typename Message>
    void sendRequest(const msg::message_ptr& message, const chronos::usec& timeout, const MessageHandler<Message>& handler)
    {
        sendRequest(message, timeout, [handler](const boost::system::error_code& ec, std::unique_ptr<msg::BaseMessage> response)
        {
            if (ec)
                handler(ec, nullptr);
            else if (auto casted_response = msg::message_cast<Message>(std::move(response)))
                handler(ec, std::move(casted_response));
            else
                handler(boost::system::errc::make_error_code(boost::system::errc::bad_message), nullptr);
        });
    }

    /// @return MAC address of the client
    virtual std::string getMacAddress() = 0;

    /// async get the next message
    /// @param handler the next received message or error
    virtual void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) = 0;

protected:
    /// Send @p buffer, return result in @p write_handler
    virtual void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) = 0;

    /// Connect to @p endpoint
    virtual boost::system::error_code doConnect(boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> endpoint) = 0;

    /// Handle received messages, check for response of pending requests
    void messageReceived(std::unique_ptr<msg::BaseMessage> message, const MessageHandler<msg::BaseMessage>& handler);

    /// Send next pending message from messages_
    void sendNext();

    /// Cancel all pending requests and timers
    void cancelRequests();

    /// Base message holding the received message
    msg::BaseMessage base_message_;

    /// Strand to serialize send/receive
    boost::asio::strand<boost::asio::any_io_executor> strand_;

    /// TCP resolver
    boost::asio::ip::tcp::resolver resolver_;

    /// List of pending requests, waiting for a response (Message::refersTo)
    std::vector<std::weak_ptr<PendingRequest>> pending_requests_;
    /// unique request id to match a response
    uint16_t reqId_;
    /// Server settings (host and port)
    ClientSettings::Server server_;
    /// Size of a base message (= message header)
    const size_t base_msg_size_;
    /// Send stream buffer
    boost::asio::streambuf streambuf_;

    /// A pending request
    struct PendingMessage
    {
        /// c'tor
        PendingMessage(msg::message_ptr msg, ResultHandler handler) : msg(std::move(msg)), handler(std::move(handler))
        {
        }
        /// Pointer to the request
        msg::message_ptr msg;
        /// Response handler
        ResultHandler handler;
    };

    /// Pending messages to be sent
    std::deque<PendingMessage> messages_;
};


/// Plain TCP connection
class ClientConnectionTcp : public ClientConnection
{
public:
    /// c'tor
    ClientConnectionTcp(boost::asio::io_context& io_context, ClientSettings::Server server);
    /// d'tor
    virtual ~ClientConnectionTcp();

    void disconnect() override;
    std::string getMacAddress() override;
    void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) override;

private:
    boost::system::error_code doConnect(boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> endpoint) override;
    void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) override;

    /// TCP socket
    tcp_socket socket_;
    /// Receive buffer
    std::vector<char> buffer_;
};


/// Websocket connection
class ClientConnectionWs : public ClientConnection
{
public:
    /// c'tor
    ClientConnectionWs(boost::asio::io_context& io_context, ClientSettings::Server server);
    /// d'tor
    virtual ~ClientConnectionWs();

    void disconnect() override;
    std::string getMacAddress() override;
    void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) override;

private:
    boost::system::error_code doConnect(boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> endpoint) override;
    void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) override;

    /// @return the websocket
    tcp_websocket& getWs();

    /// TCP web socket
    std::optional<tcp_websocket> tcp_ws_;
    /// Receive buffer
    boost::beast::flat_buffer buffer_;
    /// protect tcp_ws_
    std::mutex ws_mutex_;
};


#ifdef HAS_OPENSSL

/// Websocket connection
class ClientConnectionWss : public ClientConnection
{
public:
    /// c'tor
    ClientConnectionWss(boost::asio::io_context& io_context, boost::asio::ssl::context& ssl_context, ClientSettings::Server server);
    /// d'tor
    virtual ~ClientConnectionWss();

    void disconnect() override;
    std::string getMacAddress() override;
    void getNextMessage(const MessageHandler<msg::BaseMessage>& handler) override;

private:
    boost::system::error_code doConnect(boost::asio::ip::basic_endpoint<boost::asio::ip::tcp> endpoint) override;
    void write(boost::asio::streambuf& buffer, WriteHandler&& write_handler) override;

    /// @return the websocket
    ssl_websocket& getWs();

    /// SSL context
    boost::asio::ssl::context& ssl_context_;
    /// SSL web socket
    std::optional<ssl_websocket> ssl_ws_;
    /// Receive buffer
    boost::beast::flat_buffer buffer_;
    /// protect ssl_ws_
    std::mutex ws_mutex_;
};

#endif // HAS_OPENSSL