File: acceptor.cpp

package info (click to toggle)
cubemap 1.5.2-3
  • links: PTS
  • area: main
  • in suites: forky, sid, trixie
  • size: 908 kB
  • sloc: ansic: 8,411; cpp: 5,730; sh: 114; perl: 112; makefile: 76
file content (166 lines) | stat: -rw-r--r-- 4,321 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <unistd.h>

#include "acceptor.h"
#include "log.h"
#include "serverpool.h"
#include "state.pb.h"
#include "util.h"

using namespace std;

extern ServerPool *servers;

int create_server_socket(const sockaddr_in6 &addr, SocketType socket_type)
{
	// NOTE: We set as non-blocking, so the acceptor thread can notice that we want to shut it down.
	int server_sock;
	if (socket_type == TCP_SOCKET) {
		server_sock = socket(PF_INET6, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
	} else {
		assert(socket_type == UDP_SOCKET);
		server_sock = socket(PF_INET6, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
	}
	if (server_sock == -1) {
		log_perror("socket");
		exit(1);
	}

	int one = 1;
	if (setsockopt(server_sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) == -1) {
		log_perror("setsockopt(SO_REUSEADDR)");
		exit(1);
	}

	// We want dual-stack sockets. (Sorry, OpenBSD and Windows XP...)
	int zero = 0;
	if (setsockopt(server_sock, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(zero)) == -1) {
		log_perror("setsockopt(IPV6_V6ONLY)");
		exit(1);
	}

	if (::bind(server_sock, reinterpret_cast<const sockaddr *>(&addr), sizeof(addr)) == -1) {
		log_perror("bind");
		exit(1);
	}

	if (socket_type == TCP_SOCKET) {
		if (listen(server_sock, 128) == -1) {
			log_perror("listen");
			exit(1);
		}
	}

	return server_sock;
}

sockaddr_in6 create_any_address(int port)
{
	sockaddr_in6 sin6;
	memset(&sin6, 0, sizeof(sin6));
	sin6.sin6_family = AF_INET6;
	sin6.sin6_port = htons(port);
	return sin6;
}

sockaddr_in6 extract_address_from_acceptor_proto(const AcceptorProto &proto)
{
	sockaddr_in6 sin6;
	memset(&sin6, 0, sizeof(sin6));
	sin6.sin6_family = AF_INET6;

	if (!proto.addr().empty()) {
		int ret = inet_pton(AF_INET6, proto.addr().c_str(), &sin6.sin6_addr);
		assert(ret == 1);
	}

	sin6.sin6_port = htons(proto.port());
	return sin6;
}
	
Acceptor::Acceptor(int server_sock, const sockaddr_in6 &addr,
                   const string &certificate_chain, const string &private_key)
	: server_sock(server_sock),
	  addr(addr),
	  certificate_chain(certificate_chain),
	  private_key(private_key)
{
}

Acceptor::Acceptor(const AcceptorProto &serialized)
	: server_sock(serialized.server_sock()),
	  addr(extract_address_from_acceptor_proto(serialized)),
	  certificate_chain(serialized.certificate_chain()),
	  private_key(serialized.private_key())
{
	// Set back the close-on-exec flag for the socket.
	// (This can't leak into a child, since we haven't been started yet.)
	fcntl(server_sock, F_SETFD, FD_CLOEXEC);
}

AcceptorProto Acceptor::serialize() const
{
	// Unset the close-on-exec flag for the socket.
	// (This can't leak into a child, since there's only one thread left.)
	fcntl(server_sock, F_SETFD, 0);

	char buf[INET6_ADDRSTRLEN];
	inet_ntop(addr.sin6_family, &addr.sin6_addr, buf, sizeof(buf));

	AcceptorProto serialized;
	serialized.set_server_sock(server_sock);
	serialized.set_addr(buf);
	serialized.set_port(ntohs(addr.sin6_port));
	serialized.set_certificate_chain(certificate_chain);
	serialized.set_private_key(private_key);
	return serialized;
}

void Acceptor::close_socket()
{
	safe_close(server_sock);
}

void Acceptor::do_work()
{
	while (!should_stop()) {
		if (!wait_for_activity(server_sock, POLLIN, nullptr)) {
			continue;
		}

		sockaddr_in6 addr;
		socklen_t addrlen = sizeof(addr);

		// Get a new socket, and set it as nonblocking.
		int sock = accept4(server_sock, reinterpret_cast<sockaddr *>(&addr), &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
		if (sock == -1 && errno == EINTR) {
			continue;
		}
		if (sock == -1) {
			log_perror("accept");
			usleep(100000);
			continue;
		}

		// Enable TCP_CORK for maximum throughput. In the rare case that the
		// stream stops entirely, this will cause a small delay (~200 ms)
		// before the last part is sent out, but that should be fine.
		int one = 1;
		if (setsockopt(sock, SOL_TCP, TCP_CORK, &one, sizeof(one)) == -1) {
			log_perror("setsockopt(TCP_CORK)");
			// Can still continue.
		}

		// Pick a server, round-robin, and hand over the socket to it.
		servers->add_client(sock, this);
	}
}