File: server.h

package info (click to toggle)
cubemap 1.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 428 kB
  • sloc: cpp: 4,431; sh: 114; perl: 102; makefile: 60
file content (158 lines) | stat: -rw-r--r-- 5,815 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#ifndef _SERVER_H
#define _SERVER_H 1

#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <time.h>
#include <map>
#include <queue>
#include <string>
#include <vector>

#include "client.h"
#include "stream.h"
#include "thread.h"

class ClientProto;
struct Stream;

#define EPOLL_MAX_EVENTS 8192
#define EPOLL_TIMEOUT_MS 20
#define MAX_CLIENT_REQUEST 16384
#define REQUEST_READ_TIMEOUT_SEC 60

class CubemapStateProto;
class StreamProto;

class Server : public Thread {
public:
	Server();
	~Server();

	// Get the list of all currently connected clients.	
	std::vector<ClientStats> get_client_stats() const;

	// Set header (both HTTP header and any stream headers) for the given stream.
	void set_header(int stream_index,
	                const std::string &http_header,
	                const std::string &stream_header);

	// Set that the given stream should use the given max pacing rate from now on.
	// NOTE: This should be set before any clients are connected!
	void set_pacing_rate(int stream_index, uint32_t pacing_rate);

	// These will be deferred until the next time an iteration in do_work() happens,
	// and the order between them are undefined.
	// XXX: header should ideally be ordered with respect to data.
	void add_client_deferred(int sock);
	void add_data_deferred(int stream_index, const char *data, size_t bytes, uint16_t metacube_flags);

	// These should not be called while running, since that would violate
	// threading assumptions (ie., that epoll is only called from one thread
	// at the same time).
	CubemapStateProto serialize();
	void add_client_from_serialized(const ClientProto &client);
	int add_stream(const std::string &url, size_t bytes_received, size_t prebuffering_bytes, Stream::Encoding encoding, Stream::Encoding src_encoding);
	int add_stream_from_serialized(const StreamProto &stream, int data_fd);
	int lookup_stream_by_url(const std::string &url) const;
	void set_backlog_size(int stream_index, size_t new_size);
	void set_prebuffering_bytes(int stream_index, size_t new_amount);
	void set_encoding(int stream_index, Stream::Encoding encoding);
	void set_src_encoding(int stream_index, Stream::Encoding encoding);
	void add_gen204(const std::string &url, const std::string &allow_origin);

private:
	// Mutex protecting queued_add_clients.
	// Note that if you want to hold both this and <mutex> below,
	// you will need to take <mutex> before this one.
	mutable pthread_mutex_t queued_clients_mutex;

	// Deferred commands that should be run from the do_work() thread as soon as possible.
	// We defer these for two reasons:
	//
	//  - We only want to fiddle with epoll from one thread at any given time,
	//    and doing add_client() from the acceptor thread would violate that.
	//  - We don't want the input thread(s) hanging on <mutex> when doing
	//    add_data(), since they want to do add_data() rather often, and <mutex>
	//    can be taken a lot of the time.
	//	
	// Protected by <queued_clients_mutex>.
	std::vector<int> queued_add_clients;

	// All variables below this line are protected by the mutex.
	mutable pthread_mutex_t mutex;

	// All streams.
	std::vector<Stream *> streams;

	// Map from URL to index into <streams>.
	std::map<std::string, int> stream_url_map;

	// Map from URL to CORS Allow-Origin header (or empty string).
	std::map<std::string, std::string> ping_url_map;

	// Map from file descriptor to client.
	std::map<int, Client> clients;

	// A list of all clients, ordered by the time they connected (first element),
	// and their file descriptor (second element). It is ordered by connection time
	// (and thus also by read timeout time) so that we can read clients from the
	// start and stop processing once we get to one that isn't ready to be
	// timed out yet (which means we only have to look at each client exactly once,
	// save for the first element of the queue, which is always checked).
	//
	// Note that when we delete a client, we don't update this queue.
	// This means that when reading it, we need to check if the client it
	// describes is still exists (ie., that the fd still exists, and that
	// the timespec matches).
	std::queue<std::pair<timespec, int> > clients_ordered_by_connect_time;

	// Used for epoll implementation (obviously).
	int epoll_fd;
	epoll_event events[EPOLL_MAX_EVENTS];

	// The actual worker thread.
	virtual void do_work();

	// Process a client; read and write data as far as we can.
	// After this call, one of these four is true:
	//
	//  1. The socket is closed, and the client deleted.
	//  2. We are still waiting for more data from the client.
	//  3. We've sent all the data we have to the client,
	//     and put it in <sleeping_clients>.
	//  4. The socket buffer is full (which means we still have
	//     data outstanding).
	//
	// For #2, we listen for EPOLLIN events. For #3 and #4, we listen
	// for EPOLLOUT in edge-triggered mode; it will never fire for #3,
	// but it's cheaper than taking it in and out all the time.
	void process_client(Client *client);

	// Close a given client socket, and clean up after it.
	void close_client(Client *client);

	// Parse the HTTP request. Returns a HTTP status code (200/204/400/404).
	int parse_request(Client *client);

	// Construct the HTTP header, and set the client into
	// the SENDING_HEADER state.
	void construct_header(Client *client);

	// Construct a generic error with the given line, and set the client into
	// the SENDING_SHORT_RESPONSE state.
	void construct_error(Client *client, int error_code);

	// Construct a 204, and set the client into the SENDING_SHORT_RESPONSE state.
	void construct_204(Client *client);

	void process_queued_data();
	void skip_lost_data(Client *client);

	void add_client(int sock);
};

#endif  // !defined(_SERVER_H)