File: udpinput.cpp

package info (click to toggle)
cubemap 1.3.2-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 428 kB
  • sloc: cpp: 4,431; sh: 114; perl: 102; makefile: 60
file content (241 lines) | stat: -rw-r--r-- 5,937 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <stddef.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
#include <math.h>
#include <string>

#include "acceptor.h"
#include "log.h"
#include "mutexlock.h"
#include "serverpool.h"
#include "state.pb.h"
#include "stream.h"
#include "udpinput.h"
#include "util.h"
#include "version.h"

using namespace std;

extern ServerPool *servers;

namespace {

// Similar to parse_hostport(), but only parses the IP address,
// and does not use mapped-v4 addresses, since multicast seems
// to not like that too much.
bool parse_ip_address(const string &ip, sockaddr_storage *addr)
{
	memset(addr, 0, sizeof(*addr));

	assert(!ip.empty());
	if (ip[0] == '[') {
		sockaddr_in6 *addr6 = (sockaddr_in6 *)addr;
		addr6->sin6_family = AF_INET6;
		if (ip[ip.size() - 1] != ']') {
			log(ERROR, "address '%s' is malformed; must be either [ipv6addr] or ipv4addr",
				ip.c_str());
			return false;
		}
		string raw_ip(ip.begin() + 1, ip.end() - 1);
		if (inet_pton(AF_INET6, raw_ip.c_str(), &addr6->sin6_addr) != 1) {
			log(ERROR, "'%s' is not a valid IPv6 address", raw_ip.c_str());
			return false;
		}
	} else {
		sockaddr_in *addr4 = (sockaddr_in *)addr;
		addr4->sin_family = AF_INET;
		if (inet_pton(AF_INET, ip.c_str(), &addr4->sin_addr) != 1) {
			log(ERROR, "'%s' is not a valid IPv4 address");
			return false;
		}
	}

	return true;
}

bool maybe_join_multicast_group(int sock, const string &group, const string &source)
{
	if (group.empty()) {
		// Not multicast.
		return true;
	}

	// Join the given multicast group (ASM or SSM).
	// TODO: Also support sources apart from multicast groups,
	// e.g. udp://[::1]:1234 for only receiving from localhost.
	if (!source.empty()) {
		// Single-Source Multicast (SSM).
		group_source_req gsr;
		memset(&gsr, 0, sizeof(gsr));
		if (!parse_ip_address(group, &gsr.gsr_group)) {
			return false;
		}
		if (!parse_ip_address(source, &gsr.gsr_source)) {
			return false;
		}
		int level = (gsr.gsr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
		if (setsockopt(sock, level, MCAST_JOIN_SOURCE_GROUP, &gsr, sizeof(gsr)) == -1) {
			log_perror("setsockopt(MCAST_JOIN_SOURCE_GROUP)");
			return false;
		}
	} else {
		// Any-Source Multicast (ASM).
		group_req gr;
		memset(&gr, 0, sizeof(gr));
		if (!parse_ip_address(group, &gr.gr_group)) {
			return false;
		}
		int level = (gr.gr_group.ss_family == AF_INET) ? SOL_IP : SOL_IPV6;
		if (setsockopt(sock, level, MCAST_JOIN_GROUP, &gr, sizeof(gr)) == -1) {
			log_perror("setsockopt(MCAST_JOIN_GROUP)");
			return false;
		}
	}

	return true;
}

}  // namespace

UDPInput::UDPInput(const string &url)
	: url(url),
	  sock(-1)
{
	// Should be verified by the caller.
	string protocol;
	bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
	assert(ok);

	construct_header();

	pthread_mutex_init(&stats_mutex, NULL);
	stats.url = url;
	stats.bytes_received = 0;
	stats.data_bytes_received = 0;
	stats.metadata_bytes_received = 0;
	stats.connect_time = time(NULL);
	stats.latency_sec = HUGE_VAL;
}

UDPInput::UDPInput(const InputProto &serialized)
	: url(serialized.url()),
	  sock(serialized.sock())
{
	// Should be verified by the caller.
	string protocol;
	bool ok = parse_url(url, &protocol, &user, &host, &port, &path);
	assert(ok);

	construct_header();

	pthread_mutex_init(&stats_mutex, NULL);
	stats.url = url;
	stats.bytes_received = serialized.bytes_received();
	stats.data_bytes_received = serialized.data_bytes_received();
	if (serialized.has_connect_time()) {
		stats.connect_time = serialized.connect_time();
	} else {
		stats.connect_time = time(NULL);
	}
}

InputProto UDPInput::serialize() const
{
	InputProto serialized;
	serialized.set_url(url);
	serialized.set_sock(sock);
	serialized.set_bytes_received(stats.bytes_received);
	serialized.set_data_bytes_received(stats.data_bytes_received);
	serialized.set_connect_time(stats.connect_time);
	return serialized;
}

void UDPInput::close_socket()
{
	safe_close(sock);
	sock = -1;
}
	
void UDPInput::construct_header()
{
	http_header =
		"HTTP/1.0 200 OK\r\n"
		"Content-type: application/octet-stream\r\n"
		"Cache-control: no-cache\r\n"
		"Server: " SERVER_IDENTIFICATION "\r\n"
	        "Connection: close\r\n";
}
	
void UDPInput::add_destination(int stream_index)
{
	stream_indices.push_back(stream_index);
	servers->set_header(stream_index, http_header, "");
}

void UDPInput::do_work()
{
	while (!should_stop()) {
		if (sock == -1) {
			int port_num = atoi(port.c_str());
			sockaddr_in6 addr = create_any_address(port_num);
			sock = create_server_socket(addr, UDP_SOCKET);
			if (sock == -1) {
				log(WARNING, "[%s] UDP socket creation failed. Waiting 0.2 seconds and trying again...",
				             url.c_str());
				usleep(200000);
				continue;
			}

			// The syntax udp://source@group (abusing the username field
			// to store the sender in SSM) seems to be a VLC invention.
			// We mimic it.
			if (!maybe_join_multicast_group(sock, host, user)) {
				log(WARNING, "[%s] Multicast join failed. Waiting 0.2 seconds and trying again...",
				             url.c_str());
				safe_close(sock);
				sock = -1;
				usleep(200000);
				continue;
			}
		}

		// Wait for a packet, or a wakeup.
		bool activity = wait_for_activity(sock, POLLIN, NULL);
		if (!activity) {
			// Most likely, should_stop was set.
			continue;
		}

		int ret;
		do {
			ret = recv(sock, packet_buf, sizeof(packet_buf), 0);
		} while (ret == -1 && errno == EINTR);

		if (ret <= 0) {
			log_perror("recv");
			close_socket();
			continue;
		}

		{
			MutexLock lock(&stats_mutex);
			stats.bytes_received += ret;
			stats.data_bytes_received += ret;
		}
		
		for (size_t i = 0; i < stream_indices.size(); ++i) {
			servers->add_data(stream_indices[i], packet_buf, ret, /*metacube_flags=*/0);
		}
	}
}

InputStats UDPInput::get_stats() const
{
	MutexLock lock(&stats_mutex);
	return stats;
}