1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
|
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <inttypes.h>
#include <limits.h>
#include <math.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <algorithm>
#include <string>
#include <queue>
#include <vector>
#include "log.h"
#include "metacube2.h"
#include "state.pb.h"
#include "stream.h"
#include "util.h"
using namespace std;
Stream::Stream(const string &url,
size_t backlog_size,
uint64_t prebuffering_bytes,
Encoding encoding,
Encoding src_encoding,
unsigned hls_frag_duration,
size_t hls_backlog_margin,
const std::string &allow_origin)
: url(url),
encoding(encoding),
src_encoding(src_encoding),
allow_origin(allow_origin),
data_fd(make_tempfile("")),
backlog_size(backlog_size),
prebuffering_bytes(prebuffering_bytes),
hls_frag_duration(hls_frag_duration),
hls_backlog_margin(hls_backlog_margin)
{
if (data_fd == -1) {
exit(1);
}
}
Stream::~Stream()
{
if (data_fd != -1) {
safe_close(data_fd);
}
}
Stream::Stream(const StreamProto &serialized, int data_fd)
: url(serialized.url()),
unavailable(serialized.unavailable()),
http_header(serialized.http_header()),
stream_header(serialized.stream_header()),
encoding(Stream::STREAM_ENCODING_RAW), // Will be changed later.
data_fd(data_fd),
backlog_size(serialized.backlog_size()),
bytes_received(serialized.bytes_received()),
first_fragment_index(serialized.first_fragment_index()),
discontinuity_counter(serialized.discontinuity_counter())
{
if (data_fd == -1) {
exit(1);
}
// Set the close-on-exec parameter back on the backlog fd.
fcntl(data_fd, F_SETFD, FD_CLOEXEC);
for (ssize_t point : serialized.suitable_starting_point()) {
if (point == -1) {
// Can happen when upgrading from before 1.1.3,
// where this was an optional field with -1 signifying
// "no such point".
continue;
}
suitable_starting_points.push_back(point);
}
for (const FragmentStartProto &fragment : serialized.fragment()) {
fragments.push_back(FragmentStart { size_t(fragment.byte_position()), fragment.pts(), fragment.begins_header() });
}
}
StreamProto Stream::serialize()
{
StreamProto serialized;
serialized.set_unavailable(unavailable);
serialized.set_http_header(http_header);
serialized.set_stream_header(stream_header);
serialized.add_data_fds(data_fd);
serialized.set_backlog_size(backlog_size);
serialized.set_bytes_received(bytes_received);
for (size_t point : suitable_starting_points) {
serialized.add_suitable_starting_point(point);
}
for (const FragmentStart &fragment : fragments) {
FragmentStartProto *proto = serialized.add_fragment();
proto->set_byte_position(fragment.byte_position);
proto->set_pts(fragment.pts);
proto->set_begins_header(fragment.begins_header);
}
serialized.set_first_fragment_index(first_fragment_index);
serialized.set_discontinuity_counter(discontinuity_counter);
// Unset the close-on-exec flag for the backlog fd.
// (This can't leak into a child, since there's only one thread left.)
fcntl(data_fd, F_SETFD, 0);
serialized.set_url(url);
data_fd = -1;
return serialized;
}
void Stream::set_backlog_size(size_t new_size)
{
if (backlog_size == new_size) {
return;
}
string existing_data;
if (!read_tempfile_and_close(data_fd, &existing_data)) {
exit(1);
}
// Unwrap the data so it's no longer circular.
if (bytes_received <= backlog_size) {
existing_data.resize(bytes_received);
} else {
size_t pos = bytes_received % backlog_size;
existing_data = existing_data.substr(pos, string::npos) +
existing_data.substr(0, pos);
}
// See if we need to discard data.
if (new_size < existing_data.size()) {
size_t to_discard = existing_data.size() - new_size;
existing_data = existing_data.substr(to_discard, string::npos);
}
// Create a new, empty data file.
data_fd = make_tempfile("");
if (data_fd == -1) {
exit(1);
}
backlog_size = new_size;
// Now cheat a bit by rewinding, and adding all the old data back.
bytes_received -= existing_data.size();
DataElement data_element;
data_element.data.iov_base = const_cast<char *>(existing_data.data());
data_element.data.iov_len = existing_data.size();
data_element.metacube_flags = 0; // Ignored by add_data_raw().
vector<DataElement> data_elements;
data_elements.push_back(data_element);
add_data_raw(data_elements);
remove_obsolete_starting_points();
}
void Stream::set_header(const std::string &new_http_header, const std::string &new_stream_header)
{
unavailable = false;
http_header = new_http_header;
if (new_stream_header == stream_header) {
return;
}
// We cannot start at any of the older starting points anymore,
// since they'd get the wrong header for the stream (not to mention
// that a changed header probably means the stream restarted,
// which means any client starting on the old one would probably
// stop playing properly at the change point). Next block
// should be a suitable starting point (if not, something is
// pretty strange), so it will fill up again soon enough.
suitable_starting_points.clear();
// HLS, on the other hand, can deal with discontinuities and multiple
// headers. At least in theory (client support varies wildly).
if (!fragments.empty()) {
// Commit the old header to the backlog, so that we can serve it
// for all the old fragments for as long as they exist.
if (!stream_header.empty()) {
// End the current fragment and make a new one for the header.
fragments.push_back(Stream::FragmentStart { bytes_received, 0.0, true });
process_queued_data();
Stream::DataElement elem;
elem.data.iov_base = (char *)stream_header.data();
elem.data.iov_len = stream_header.size();
add_data_raw({ elem });
remove_obsolete_starting_points();
// The discontinuity counter will be increased when
// this header goes out of the backlog.
}
clear_hls_playlist_cache();
}
stream_header = new_stream_header;
}
void Stream::put_client_to_sleep(Client *client)
{
sleeping_clients.push_back(client);
}
// Return a new set of iovecs that contains only the first <bytes_wanted> bytes of <data>.
vector<iovec> collect_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
{
vector<iovec> ret;
size_t max_iovecs = min<size_t>(data.size(), IOV_MAX);
for (size_t i = 0; i < max_iovecs && bytes_wanted > 0; ++i) {
if (data[i].data.iov_len <= bytes_wanted) {
// Consume the entire iovec.
ret.push_back(data[i].data);
bytes_wanted -= data[i].data.iov_len;
} else {
// Take only parts of this iovec.
iovec iov;
iov.iov_base = data[i].data.iov_base;
iov.iov_len = bytes_wanted;
ret.push_back(iov);
bytes_wanted = 0;
}
}
return ret;
}
// Return a new set of iovecs that contains all of <data> except the first <bytes_wanted> bytes.
vector<Stream::DataElement> remove_iovecs(const vector<Stream::DataElement> &data, size_t bytes_wanted)
{
vector<Stream::DataElement> ret;
size_t i;
for (i = 0; i < data.size() && bytes_wanted > 0; ++i) {
if (data[i].data.iov_len <= bytes_wanted) {
// Consume the entire iovec.
bytes_wanted -= data[i].data.iov_len;
} else {
// Take only parts of this iovec.
Stream::DataElement data_element;
data_element.data.iov_base = reinterpret_cast<char *>(data[i].data.iov_base) + bytes_wanted;
data_element.data.iov_len = data[i].data.iov_len - bytes_wanted;
data_element.metacube_flags = METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START;
data_element.pts = RationalPTS();
ret.push_back(data_element);
bytes_wanted = 0;
}
}
// Add the rest of the iovecs unchanged.
ret.insert(ret.end(), data.begin() + i, data.end());
return ret;
}
void Stream::add_data_raw(const vector<DataElement> &orig_data)
{
vector<DataElement> data = orig_data;
while (!data.empty()) {
size_t pos = bytes_received % backlog_size;
// Collect as many iovecs as we can before we hit the point
// where the circular buffer wraps around.
vector<iovec> to_write = collect_iovecs(data, backlog_size - pos);
ssize_t ret;
do {
ret = pwritev(data_fd, to_write.data(), to_write.size(), pos);
} while (ret == -1 && errno == EINTR);
if (ret == -1) {
log_perror("pwritev");
// Dazed and confused, but trying to continue...
return;
}
bytes_received += ret;
// Remove the data that was actually written from the set of iovecs.
data = remove_iovecs(data, ret);
}
}
void Stream::remove_obsolete_starting_points()
{
// We could do a binary search here (std::lower_bound), but it seems
// overkill for removing what's probably only a few points.
while (!suitable_starting_points.empty() &&
bytes_received - suitable_starting_points[0] > backlog_size) {
suitable_starting_points.pop_front();
}
assert(backlog_size >= hls_backlog_margin);
while (!fragments.empty() &&
bytes_received - fragments[0].byte_position > (backlog_size - hls_backlog_margin)) {
if (fragments[0].begins_header) {
++discontinuity_counter;
} else {
++first_fragment_index;
}
fragments.pop_front();
clear_hls_playlist_cache();
}
}
void Stream::add_data_deferred(const char *data, size_t bytes, uint16_t metacube_flags, const RationalPTS &pts)
{
// For regular output, we don't want to send the client twice
// (it's already sent out together with the HTTP header).
// However, for Metacube output, we need to send it so that
// the Cubemap instance in the other end has a chance to update it.
// It may come twice in its stream, but Cubemap doesn't care.
if (encoding == Stream::STREAM_ENCODING_RAW &&
(metacube_flags & METACUBE_FLAGS_HEADER) != 0) {
return;
}
lock_guard<mutex> lock(queued_data_mutex);
DataElement data_element;
data_element.metacube_flags = metacube_flags;
data_element.pts = pts;
if (encoding == Stream::STREAM_ENCODING_METACUBE) {
// Construct a PTS metadata block. (We'll avoid sending it out
// if we don't have a valid PTS.)
metacube2_pts_packet pts_packet;
pts_packet.type = htobe64(METACUBE_METADATA_TYPE_NEXT_BLOCK_PTS);
pts_packet.pts = htobe64(pts.pts);
pts_packet.timebase_num = htobe64(pts.timebase_num);
pts_packet.timebase_den = htobe64(pts.timebase_den);
metacube2_block_header pts_hdr;
memcpy(pts_hdr.sync, METACUBE2_SYNC, sizeof(pts_hdr.sync));
pts_hdr.size = htonl(sizeof(pts_packet));
pts_hdr.flags = htons(METACUBE_FLAGS_METADATA);
pts_hdr.csum = htons(metacube2_compute_crc(&pts_hdr));
// Add a Metacube block header before the data.
metacube2_block_header hdr;
memcpy(hdr.sync, METACUBE2_SYNC, sizeof(hdr.sync));
hdr.size = htonl(bytes);
hdr.flags = htons(metacube_flags);
hdr.csum = htons(metacube2_compute_crc(&hdr));
data_element.data.iov_len = bytes + sizeof(hdr);
if (pts.timebase_num != 0) {
data_element.data.iov_len += sizeof(pts_hdr) + sizeof(pts_packet);
}
data_element.data.iov_base = new char[data_element.data.iov_len];
char *ptr = reinterpret_cast<char *>(data_element.data.iov_base);
if (pts.timebase_num != 0) {
memcpy(ptr, &pts_hdr, sizeof(pts_hdr));
ptr += sizeof(pts_hdr);
memcpy(ptr, &pts_packet, sizeof(pts_packet));
ptr += sizeof(pts_packet);
}
memcpy(ptr, &hdr, sizeof(hdr));
ptr += sizeof(hdr);
memcpy(ptr, data, bytes);
queued_data.push_back(data_element);
} else if (encoding == Stream::STREAM_ENCODING_RAW) {
// Just add the data itself.
data_element.data.iov_base = new char[bytes];
memcpy(data_element.data.iov_base, data, bytes);
data_element.data.iov_len = bytes;
queued_data.push_back(data_element);
} else {
assert(false);
}
}
void Stream::process_queued_data()
{
vector<DataElement> queued_data_copy;
// Hold the lock for as short as possible, since add_data_raw() can possibly
// write to disk, which might disturb the input thread.
{
lock_guard<mutex> lock(queued_data_mutex);
if (queued_data.empty()) {
return;
}
swap(queued_data, queued_data_copy);
}
// Add suitable starting points for the stream, if the queued data
// contains such starting points. Note that we drop starting points
// if they're less than 10 kB apart, so that we don't get a huge
// amount of them for e.g. each and every MPEG-TS 188-byte cell.
// The 10 kB value is somewhat arbitrary, but at least it should make
// the RAM cost of saving the position ~0.1% (or less) of the actual
// data, and 10 kB is a very fine granularity in most streams.
static const int minimum_start_point_distance = 10240;
size_t byte_position = bytes_received;
bool need_hls_clear = false;
for (const DataElement &elem : queued_data_copy) {
if ((elem.metacube_flags & METACUBE_FLAGS_NOT_SUITABLE_FOR_STREAM_START) == 0) {
size_t num_points = suitable_starting_points.size();
if (num_points >= 2 &&
suitable_starting_points[num_points - 1] - suitable_starting_points[num_points - 2] < minimum_start_point_distance) {
// p[n-1] - p[n-2] < 10 kB, so drop p[n-1].
suitable_starting_points.pop_back();
}
suitable_starting_points.push_back(byte_position);
if (elem.pts.timebase_num != 0) {
need_hls_clear |= add_fragment_boundary(byte_position, elem.pts);
}
}
byte_position += elem.data.iov_len;
}
if (need_hls_clear) {
clear_hls_playlist_cache();
}
add_data_raw(queued_data_copy);
remove_obsolete_starting_points();
for (const DataElement &elem : queued_data_copy) {
char *data = reinterpret_cast<char *>(elem.data.iov_base);
delete[] data;
}
// We have more data, so wake up all clients.
if (to_process.empty()) {
swap(sleeping_clients, to_process);
} else {
to_process.insert(to_process.end(), sleeping_clients.begin(), sleeping_clients.end());
sleeping_clients.clear();
}
}
bool Stream::add_fragment_boundary(size_t byte_position, const RationalPTS &pts)
{
double pts_double = double(pts.pts) * pts.timebase_den / pts.timebase_num;
if (fragments.size() <= 1 ||
fragments[fragments.size() - 1].begins_header ||
fragments[fragments.size() - 2].begins_header) {
// Just starting up, so try to establish the first in-progress fragment.
fragments.push_back(FragmentStart{ byte_position, pts_double, false });
return false;
}
// Keep extending the in-progress fragment as long as we do not
// exceed the target duration by more than half a second
// (RFC 8216 4.3.3.1) and we get closer to the target by doing so.
// Note that in particular, this means we'll always extend
// as long as we don't exceed the target duration.
double current_duration = pts_double - fragments[fragments.size() - 1].pts;
double candidate_duration = pts_double - fragments[fragments.size() - 2].pts;
if (lrintf(candidate_duration) <= hls_frag_duration &&
fabs(candidate_duration - hls_frag_duration) < fabs(current_duration - hls_frag_duration)) {
fragments.back() = FragmentStart{ byte_position, pts_double, false };
return false;
} else {
// Extending the in-progress fragment would make it too long,
// so finalize it and start a new in-progress fragment.
fragments.push_back(FragmentStart{ byte_position, pts_double, false });
return true;
}
}
void Stream::clear_hls_playlist_cache()
{
hls_playlist_http10.reset();
hls_playlist_http11_close.reset();
hls_playlist_http11_persistent.reset();
}
shared_ptr<const string> Stream::generate_hls_playlist(bool http_11, bool close_after_response)
{
char buf[256];
snprintf(buf, sizeof(buf),
"#EXTM3U\r\n"
"#EXT-X-VERSION:7\r\n"
"#EXT-X-TARGETDURATION:%u\r\n"
"#EXT-X-MEDIA-SEQUENCE:%" PRIu64 "\r\n"
"#EXT-X-DISCONTINUITY-SEQUENCE:%" PRIu64 "\r\n",
hls_frag_duration,
first_fragment_index,
discontinuity_counter);
string playlist = buf;
if (fragments.size() >= 3) {
bool printed_header_for_this_group = false;
bool printed_first_header = false;
for (size_t i = 0; i < fragments.size() - 2; ++i) {
char buf[256];
if (fragments[i].begins_header) {
// End of this group. (We've already printed the header
// as part of the previous group.)
printed_header_for_this_group = false;
continue;
}
if (!printed_header_for_this_group) {
// Look forward until we find the header for this group (if any).
for (size_t j = i + 1; j < fragments.size() - 1; ++j) {
if (fragments[j].begins_header) {
if (printed_first_header) {
playlist += "#EXT-X-DISCONTINUITY\r\n";
}
snprintf(buf, sizeof(buf),
"#EXT-X-MAP:URI=\"%s?frag=%" PRIu64 "-%" PRIu64 "\"\r\n",
url.c_str(), fragments[j].byte_position,
fragments[j + 1].byte_position);
playlist += buf;
printed_first_header = true;
printed_header_for_this_group = true;
break;
}
}
if (!printed_header_for_this_group && !stream_header.empty()) {
if (printed_first_header) {
playlist += "#EXT-X-DISCONTINUITY\r\n";
}
snprintf(buf, sizeof(buf), "#EXT-X-MAP:URI=\"%s?frag=header\"\r\n", url.c_str());
playlist += buf;
}
// Even if we didn't find anything, we don't want to search again for each fragment.
printed_first_header = true;
printed_header_for_this_group = true;
}
if (fragments[i + 1].begins_header) {
// Since we only have start pts for each block and not duration,
// we have no idea how long this fragment is; the encoder restarted
// before it got to output the next pts. However, it's likely
// to be very short, so instead of trying to guess, we just skip it.
continue;
}
snprintf(buf, sizeof(buf), "#EXTINF:%f,\r\n%s?frag=%" PRIu64 "-%" PRIu64 "\r\n",
fragments[i + 1].pts - fragments[i].pts,
url.c_str(),
fragments[i].byte_position,
fragments[i + 1].byte_position);
playlist += buf;
}
}
string response;
if (http_11) {
response = "HTTP/1.1 200 OK\r\n";
if (close_after_response) {
response.append("Connection: close\r\n");
}
} else {
assert(close_after_response);
response = "HTTP/1.0 200 OK\r\n";
}
snprintf(buf, sizeof(buf), "Content-Length: %zu\r\n", playlist.size());
response.append(buf);
response.append("Content-Type: application/x-mpegURL\r\n");
if (!allow_origin.empty()) {
response.append("Access-Control-Allow-Origin: ");
response.append(allow_origin);
response.append("\r\n");
}
response.append("\r\n");
response.append(move(playlist));
return shared_ptr<const string>(new string(move(response)));
}
|