File: ng_client.c

package info (click to toggle)
rtpengine 13.5.1.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 13,676 kB
  • sloc: ansic: 86,775; perl: 59,422; python: 3,193; sh: 1,037; makefile: 687; asm: 211
file content (192 lines) | stat: -rw-r--r-- 4,754 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#include "ng_client.h"
#include "media_socket.h"

struct endpoint_sockets {
	endpoint_t dst;
	socket_slist *sockets;
};

static void socket_free(socket_t *s) {
	close_socket(s);
	g_free(s);
}

static void endpoint_socket_free(struct endpoint_sockets *es) {
	t_slist_free_full(es->sockets, socket_free);
	g_free(es);
}

TYPED_GHASHTABLE(endpoint_socket_ht, endpoint_t, struct endpoint_sockets,
		endpoint_hash, endpoint_eq,
		NULL, endpoint_socket_free)


static endpoint_socket_ht ng_client_endpoints;
static rwlock_t ng_client_endpoints_lock;


void ng_client_init(void) {
	ng_client_endpoints = endpoint_socket_ht_new();
}

void ng_client_cleanup(void) {
	t_hash_table_destroy(ng_client_endpoints);
}


static struct endpoint_sockets *ng_client_get_entry(const endpoint_t *dst) {
	struct endpoint_sockets *es;

	// quick check for existing entry
	{
		RWLOCK_R(&ng_client_endpoints_lock);
		es = t_hash_table_lookup(ng_client_endpoints, dst);
	}

	if (es)
		return es;

	// we have to create one
	es = g_new0(__typeof(*es), 1);
	es->dst = *dst;

	RWLOCK_W(&ng_client_endpoints_lock);
	// ... but someone may have beaten us to it
	__auto_type es2 = t_hash_table_lookup(ng_client_endpoints, dst);
	if (es2) {
		// lost the race
		g_free(es);
		es = es2;
	}
	else
		t_hash_table_insert(ng_client_endpoints, &es->dst, es);

	return es;
}

static socket_slist *ng_client_get_socket(struct endpoint_sockets *es) {
	// see if we can grab a socket
	socket_slist *link = __atomic_load_n(&es->sockets, __ATOMIC_SEQ_CST);
	bool success = false;
	while (link && !success)
		success = atomic_compare_exchange(&es->sockets, &link, link->next);

	if (link) {
		link->next = NULL;
		return link;
	}

	// no socket available, we need to create one
	link = g_new0(socket_slist, 1);
	__auto_type sock = link->data = g_new0(socket_t, 1);
	if (!connect_socket(sock, SOCK_DGRAM, &es->dst)) {
		// oops...
		ilog(LOG_ERR, "Failed to create or connect socket to remote NG peer (%s): %s",
				endpoint_print_buf(&es->dst),
				strerror(errno));
		g_free(link->data);
		g_free(link);
		return NULL;
	}
	socket_getsockname(sock);
	interfaces_exclude_port(&sock->local);
	socket_rcvtimeout(sock, rtpe_config.ng_client_timeout);

	return link;
}

static void ng_client_put_socket(struct endpoint_sockets *es, socket_slist *link) {
	link->next = __atomic_load_n(&es->sockets, __ATOMIC_SEQ_CST);
	bool success;
	do
		success = atomic_compare_exchange(&es->sockets, &link->next, link);
	while (!success);
}

bencode_item_t *ng_client_request(const endpoint_t *dst, const str *req, bencode_buffer_t *rbuf) {
	__auto_type es = ng_client_get_entry(dst);
	__auto_type link = ng_client_get_socket(es);
	if (!link)
		return NULL;

	__auto_type sock = link->data;

	// construct message
	char cookie[17];
	rand_hex_str(cookie, 8);
	cookie[16] = ' ';

	struct iovec iov[2] = {
		{
			.iov_base = cookie,
			.iov_len = sizeof(cookie),
		},
		{
			.iov_base = req->s,
			.iov_len = req->len,
		},
	};

	ilog(LOG_DEBUG, "Sending NG request to %s: '" STR_FORMAT "'",
			endpoint_print_buf(dst),
			STR_FMT(req));

	static const size_t buflen = 4096;
	char *buf = bencode_buffer_alloc(rbuf, buflen);
	ssize_t len = 0;

	for (unsigned int try = 0; try < rtpe_config.ng_client_retries; try++) {
		ilog(LOG_DEBUG, "Attempt #%u sending NG request", try + 1);

		ssize_t ret = socket_sendiov(sock, iov, G_N_ELEMENTS(iov), NULL, NULL);
		if (ret <= 0)
			goto err;

		// receive the response
		len = socket_recvfrom(sock, buf, buflen, NULL);
		if (len > 0)
			ilog(LOG_DEBUG, "Received response from NG peer (%s): '%.*s'",
					endpoint_print_buf(dst),
					(int) len, buf);
		if (len == buflen)
			ilog(LOG_WARN, "Possibly truncated response from remote NG peer (%s)",
					endpoint_print_buf(dst));
		if (len > sizeof(cookie) && memcmp(buf, cookie, sizeof(cookie)) == 0)
			break;

		if (len < 0)
			ilog(LOG_WARN, "Error reading from socket from remote NG peer (%s): %s",
					endpoint_print_buf(dst),
					strerror(errno));
		else if (len == 0)
			ilog(LOG_WARN, "EOF while reading from socket from remote NG peer (%s)",
					endpoint_print_buf(dst));
		else
			ilog(LOG_WARN, "Short packet or mismatched cookie while reading from socket "
					"from remote NG peer (%s)",
					endpoint_print_buf(dst));
	}

	if (len <= 0)
		goto err;

	ilog(LOG_DEBUG, "Received valid NG response: '%.*s'", (int) len, buf);

	bencode_item_t *ret = bencode_decode_expect(rbuf, buf + sizeof(cookie), len - sizeof(cookie),
			BENCODE_DICTIONARY);
	if (!ret) {
		errno = EIO;
		goto err;
	}

	ng_client_put_socket(es, link);
	return ret;

err:
	ilog(LOG_ERR, "Error communicating with remote NG peer (%s): %s",
			endpoint_print_buf(&sock->remote),
			strerror(errno));

	ng_client_put_socket(es, link);
	return NULL;
}