File: rtp.c

package info (click to toggle)
libtoxcore 0.2.22-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,992 kB
  • sloc: ansic: 70,235; cpp: 14,770; sh: 1,576; python: 649; makefile: 255; perl: 39
file content (1047 lines) | stat: -rw-r--r-- 37,047 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
/* SPDX-License-Identifier: GPL-3.0-or-later
 * Copyright © 2016-2026 The TokTok team.
 * Copyright © 2013-2015 Tox project.
 */
#include "rtp.h"

#include <assert.h>
#include <stdlib.h>
#include <string.h>

#include <sodium.h>

#include "../toxcore/ccompat.h"
#include "../toxcore/logger.h"
#include "../toxcore/mono_time.h"
#include "../toxcore/net_crypto.h"
#include "../toxcore/network.h"
#include "../toxcore/util.h"

/**
 * Maximum size of a single RTP frame in bytes.
 * This limit prevents memory exhaustion attacks where a malicious peer sends
 * a header indicating a very large frame size, causing the receiver to allocate
 * excessive memory.
 */
#define MAX_RTP_FRAME_SIZE (32 * 1024 * 1024)

struct RTPHeader {
    /* Standard RTP header */
    unsigned ve: 2; /* Version has only 2 bits! */
    unsigned pe: 1; /* Padding */
    unsigned xe: 1; /* Extra header */
    unsigned cc: 4; /* Contributing sources count */

    unsigned ma: 1; /* Marker */
    unsigned pt: 7; /* Payload type */

    uint16_t sequnum;
    uint32_t timestamp;
    uint32_t ssrc;

    /* Non-standard Tox-specific fields */

    /**
     * Bit mask of `RTPFlags` setting features of the current frame.
     */
    uint64_t flags;

    /**
     * The full 32 bit data offset of the current data chunk. The
     * @ref offset_lower data member contains the lower 16 bits of this value.
     * For frames smaller than 64KiB, @ref offset_full and @ref offset_lower are
     * equal.
     */
    uint32_t offset_full;
    /**
     * The full 32 bit payload length without header and packet id.
     */
    uint32_t data_length_full;
    /**
     * Only the receiver uses this field (why do we have this?).
     */
    uint32_t received_length_full;

    /**
     * Data offset of the current part (lower bits).
     */
    uint16_t offset_lower;
    /**
     * Total message length (lower bits).
     */
    uint16_t data_length_lower;
};

struct RTPMessage {
    /**
     * This is used in the old code that doesn't deal with large frames, i.e.
     * the audio code or receiving code for old 16 bit messages. We use it to
     * record the number of bytes received so far in a multi-part message. The
     * multi-part message in the old code is stored in `RTPSession::mp`.
     */
    uint32_t len;

    struct RTPHeader header;
    uint8_t data[];
};

/**
 * One slot in the work buffer list. Represents one frame that is currently
 * being assembled.
 */
struct RTPWorkBuffer {
    /**
     * Whether this slot contains a key frame. This is true iff
     * `buf->header.flags & RTP_KEY_FRAME`.
     */
    bool is_keyframe;
    /**
     * The number of bytes received so far, regardless of which pieces. I.e. we
     * could have received the first 1000 bytes and the last 1000 bytes with
     * 4000 bytes in the middle still to come, and this number would be 2000.
     */
    uint32_t received_len;
    /**
     * The message currently being assembled.
     */
    struct RTPMessage *buf;
};

struct RTPWorkBufferList {
    int8_t next_free_entry;
    struct RTPWorkBuffer work_buffer[USED_RTP_WORKBUFFER_COUNT];
};

/**
 * RTP control session.
 */
struct RTPSession {
    uint8_t  payload_type;
    uint16_t sequnum;      /* Sending sequence number */
    uint16_t rsequnum;     /* Receiving sequence number */
    uint32_t rtimestamp;
    uint32_t ssrc; //  this seems to be unused!?
    struct RTPMessage *mp; /* Expected parted message */
    struct RTPWorkBufferList *work_buffer_list;
    uint8_t  first_packets_counter; /* dismiss first few lost video packets */
    const Logger *log;
    Mono_Time *mono_time;
    bool rtp_receive_active; /* if this is set to false then incoming rtp packets will not be processed by rtp_receive_packet() */

    rtp_send_packet_cb *send_packet;
    void *send_packet_user_data;

    rtp_add_recv_cb *add_recv;
    rtp_add_lost_cb *add_lost;
    void *bwc_user_data;

    void *cs;
    rtp_m_cb *mcb;
};

const uint8_t *rtp_message_data(const RTPMessage *msg)
{
    return msg->data;
}

uint32_t rtp_message_len(const RTPMessage *msg)
{
    return msg->len;
}

uint8_t rtp_message_pt(const RTPMessage *msg)
{
    return msg->header.pt;
}

uint16_t rtp_message_sequnum(const RTPMessage *msg)
{
    return msg->header.sequnum;
}

uint64_t rtp_message_flags(const RTPMessage *msg)
{
    return msg->header.flags;
}

uint32_t rtp_message_data_length_full(const RTPMessage *msg)
{
    return msg->header.data_length_full;
}

bool rtp_session_is_receiving_active(const RTPSession *session)
{
    if (session == nullptr) {
        return false;
    }
    return session->rtp_receive_active;
}

uint32_t rtp_session_get_ssrc(const RTPSession *session)
{
    return session->ssrc;
}

void rtp_session_set_ssrc(RTPSession *session, uint32_t ssrc)
{
    session->ssrc = ssrc;
}

/**
 * The number of milliseconds we want to keep a keyframe in the buffer for,
 * even though there are no free slots for incoming frames.
 */
#define VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS 15

// allocate_len is NOT including header!
static struct RTPMessage *_Nullable new_message(const Logger *_Nonnull log, const struct RTPHeader *_Nonnull header, size_t allocate_len,
        const uint8_t *_Nonnull data, uint16_t data_length)
{
    if (allocate_len < data_length) {
        LOGGER_WARNING(log, "new_message: allocate_len (%zu) < data_length (%u)", allocate_len, data_length);
        return nullptr;
    }

    struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + allocate_len);

    if (msg == nullptr) {
        LOGGER_WARNING(log, "Could not allocate RTPMessage buffer");
        return nullptr;
    }

    msg->len = data_length; // result without header
    msg->header = *header;
    memcpy(msg->data, data, msg->len);
    return msg;
}

/**
 * Instruct the caller to clear slot 0.
 */
#define GET_SLOT_RESULT_DROP_OLDEST_SLOT (-1)

/**
 * Instruct the caller to drop the incoming packet.
 */
#define GET_SLOT_RESULT_DROP_INCOMING (-2)

/**
 * Find the next free slot in work_buffer for the incoming data packet.
 *
 * - If the data packet belongs to a frame that's already in the work_buffer then
 *   use that slot.
 * - If there is no free slot return GET_SLOT_RESULT_DROP_OLDEST_SLOT.
 * - If the data packet is too old return GET_SLOT_RESULT_DROP_INCOMING.
 *
 * If there is a keyframe being assembled in slot 0, keep it a bit longer and
 * do not kick it out right away if all slots are full instead kick out the new
 * incoming interframe.
 */
static int8_t get_slot(const Logger *_Nonnull log, struct RTPWorkBufferList *_Nonnull wkbl, bool is_keyframe,
                       const struct RTPHeader *_Nonnull header, bool is_multipart)
{
    if (is_multipart) {
        // This RTP message is part of a multipart frame, so we try to find an
        // existing slot with the previous parts of the frame in it.
        for (uint8_t i = 0; i < wkbl->next_free_entry; ++i) {
            const struct RTPWorkBuffer *slot = &wkbl->work_buffer[i];

            if ((slot->buf->header.sequnum == header->sequnum) && (slot->buf->header.timestamp == header->timestamp)) {
                // Sequence number and timestamp match, so this slot belongs to
                // the same frame.
                //
                // In reality, these will almost certainly either both match or
                // both not match. Only if somehow there were 65535 frames
                // between, the timestamp will matter.
                return i;
            }
        }
    }

    // The message may or may not be part of a multipart frame.
    //
    // If it is part of a multipart frame, then this is an entirely new frame
    // for which we did not have a slot *or* the frame is so old that its slot
    // has been evicted by now.
    //
    //        |----------- time ----------->
    //        _________________
    // slot 0 |               |
    //        -----------------
    //                     _________________
    // slot 1              |               |
    //                     -----------------
    //                ____________
    // slot 2         |          | -> frame too old, drop
    //                ------------
    //
    //
    //
    //        |----------- time ----------->
    //        _________________
    // slot 0 |               |
    //        -----------------
    //                     _________________
    // slot 1              |               |
    //                     -----------------
    //                              ____________
    // slot 2                       |          | -> ok, start filling in a new slot
    //                              ------------

    // If there is a free slot:
    if (wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT) {
        // If there is at least one filled slot:
        if (wkbl->next_free_entry > 0) {
            // Get the most recently filled slot.
            const struct RTPWorkBuffer *slot = &wkbl->work_buffer[wkbl->next_free_entry - 1];

            // If the incoming packet is older than our newest slot, drop it.
            // This is the first situation in the above diagram.
            if (slot->buf->header.timestamp > header->timestamp) {
                LOGGER_DEBUG(log, "workbuffer:2:timestamp too old");
                return GET_SLOT_RESULT_DROP_INCOMING;
            }
        }

        // Not all slots are filled, and the packet is newer than our most
        // recent slot, so it's a new frame we want to start assembling. This is
        // the second situation in the above diagram.
        return wkbl->next_free_entry;
    }

    // If the incoming frame is a key frame, then stop assembling the oldest
    // slot, regardless of whether there was a keyframe in that or not.
    if (is_keyframe) {
        return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
    }

    // The incoming slot is not a key frame, so we look at slot 0 to see what to
    // do next.
    const struct RTPWorkBuffer *slot = &wkbl->work_buffer[0];

    // The incoming frame is not a key frame, but the existing slot 0 is also
    // not a keyframe, so we stop assembling the existing frame and make space
    // for the new one.
    if (!slot->is_keyframe) {
        return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
    }

    // If this key frame is fully received, we also stop assembling and clear
    // slot 0.  This also means sending the frame to the decoder.
    if (slot->received_len == slot->buf->header.data_length_full) {
        return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
    }

    // This is a key frame, not fully received yet, but it's already much older
    // than the incoming frame, so we stop assembling it and send whatever part
    // we did receive to the decoder.
    if (slot->buf->header.timestamp + VIDEO_KEEP_KEYFRAME_IN_BUFFER_FOR_MS <= header->timestamp) {
        return GET_SLOT_RESULT_DROP_OLDEST_SLOT;
    }

    // This is a key frame, it's not too old yet, so we keep it in its slot for
    // a little longer.
    LOGGER_INFO(log, "keep KEYFRAME in workbuffer");
    return GET_SLOT_RESULT_DROP_INCOMING;
}

/**
 * Returns an assembled frame (as much data as we currently have for this frame,
 * some pieces may be missing)
 *
 * If there are no frames ready, we return NULL. If this function returns
 * non-NULL, it transfers ownership of the message to the caller, i.e. the
 * caller is responsible for storing it elsewhere or calling `free()`.
 */
static struct RTPMessage *_Nullable process_frame(const Logger *_Nonnull log, struct RTPWorkBufferList *_Nonnull wkbl, uint8_t slot_id)
{
    assert(wkbl->next_free_entry >= 0);

    if (wkbl->next_free_entry == 0) {
        // There are no frames in any slot.
        return nullptr;
    }

    // Slot 0 contains a key frame, slot_id points at an interframe that is
    // relative to that key frame, so we don't use it yet.
    if (wkbl->work_buffer[0].is_keyframe && slot_id != 0) {
        LOGGER_DEBUG(log, "process_frame:KEYFRAME waiting in slot 0");
        return nullptr;
    }

    // Either slot_id is 0 and slot 0 is a key frame, or there is no key frame
    // in slot 0 (and slot_id is anything).
    struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id];

    // Move ownership of the frame out of the slot into m_new.
    struct RTPMessage *msg = slot->buf;
    msg->len = msg->header.data_length_full;
    slot->buf = nullptr;

    assert(wkbl->next_free_entry >= 1 && wkbl->next_free_entry <= USED_RTP_WORKBUFFER_COUNT);

    if (slot_id != wkbl->next_free_entry - 1) {
        // The slot is not the last slot, so we created a gap. We move all the
        // entries after it one step up.
        for (uint8_t i = slot_id; i < wkbl->next_free_entry - 1; ++i) {
            // Move entry (i+1) into entry (i).
            wkbl->work_buffer[i] = wkbl->work_buffer[i + 1];
        }
    }

    // We now have a free entry at the end of the array.
    --wkbl->next_free_entry;

    // Clear the newly freed entry.
    const struct RTPWorkBuffer empty = {false};
    wkbl->work_buffer[wkbl->next_free_entry] = empty;

    // Move ownership of the frame to the caller.
    return msg;
}

/**
 * @param log A pointer to the Logger object.
 * @param wkbl The list of in-progress frames, i.e. all the slots.
 * @param slot_id The slot we want to fill the data into.
 * @param is_keyframe Whether the data is part of a key frame.
 * @param header The RTP header from the incoming packet.
 * @param incoming_data The pure payload without header.
 * @param incoming_data_length The length in bytes of the incoming data payload.
 */
static bool fill_data_into_slot(const Logger *_Nonnull log, struct RTPWorkBufferList *_Nonnull wkbl, const uint8_t slot_id,
                                bool is_keyframe, const struct RTPHeader *_Nonnull header,
                                const uint8_t *_Nonnull incoming_data, uint16_t incoming_data_length)
{
    // We're either filling the data into an existing slot, or in a new one that
    // is the next free entry.
    assert(slot_id <= wkbl->next_free_entry);
    struct RTPWorkBuffer *const slot = &wkbl->work_buffer[slot_id];

    assert(header != nullptr);
    assert(is_keyframe == (bool)((header->flags & RTP_KEY_FRAME) != 0));

    if (slot->received_len == 0) {
        assert(slot->buf == nullptr);

        if (header->data_length_full > MAX_RTP_FRAME_SIZE) {
            LOGGER_WARNING(log, "RTP frame too large: %u > %u", (unsigned)header->data_length_full, (unsigned)MAX_RTP_FRAME_SIZE);
            return false;
        }

        // No data for this slot has been received, yet, so we create a new
        // message for it with enough memory for the entire frame.
        struct RTPMessage *msg = (struct RTPMessage *)calloc(1, sizeof(struct RTPMessage) + header->data_length_full);

        if (msg == nullptr) {
            LOGGER_ERROR(log, "Out of memory while trying to allocate for frame of size %u",
                         (unsigned)header->data_length_full);
            // Out of memory: throw away the incoming data.
            return false;
        }

        // Unused in the new video receiving code, as it's 16 bit and can't hold
        // the full length of large frames. Instead, we use slot->received_len.
        msg->len = 0;
        msg->header = *header;

        slot->buf = msg;
        slot->is_keyframe = is_keyframe;
        slot->received_len = 0;

        assert(wkbl->next_free_entry < USED_RTP_WORKBUFFER_COUNT);
        ++wkbl->next_free_entry;
    } else {
        if (slot->buf->header.data_length_full != header->data_length_full) {
            LOGGER_WARNING(log, "Received packet with different length than previous packets in same frame: %u != %u",
                           header->data_length_full, slot->buf->header.data_length_full);
            return false;
        }
    }

    // We already checked this when we received the packet, but we rely on it
    // here, so assert again.
    assert(header->offset_full < header->data_length_full);

    if (header->data_length_full - header->offset_full < incoming_data_length) {
        LOGGER_ERROR(log, "Packet too long for buffer: offset %u + len %u > total %u",
                     (unsigned)header->offset_full, (unsigned)incoming_data_length, (unsigned)header->data_length_full);
        return false;
    }

    // Copy the incoming chunk of data into the correct position in the full
    // frame data array.
    memcpy(
        slot->buf->data + header->offset_full,
        incoming_data,
        incoming_data_length
    );

    // Update the total received length of this slot.
    slot->received_len += incoming_data_length;

    // Update received length also in the header of the message, for later use.
    slot->buf->header.received_length_full = slot->received_len;

    return slot->received_len == header->data_length_full;
}

static void update_bwc_values(RTPSession *_Nonnull session, const struct RTPMessage *_Nonnull msg)
{
    if (session->first_packets_counter < DISMISS_FIRST_LOST_VIDEO_PACKET_COUNT) {
        ++session->first_packets_counter;
    } else {
        const uint32_t data_length_full = msg->header.data_length_full; // without header
        const uint32_t received_length_full = msg->header.received_length_full; // without header
        if (session->add_recv != nullptr) {
            session->add_recv(session->bwc_user_data, data_length_full);
        }

        if (received_length_full < data_length_full) {
            LOGGER_DEBUG(session->log, "BWC: full length=%u received length=%u", data_length_full, received_length_full);
            if (session->add_lost != nullptr) {
                session->add_lost(session->bwc_user_data, data_length_full - received_length_full);
            }
        }
    }
}

/**
 * Handle a single RTP video packet.
 *
 * The packet may or may not be part of a multipart frame. This function will
 * find out and handle it appropriately.
 *
 * @param session The current RTP session
 * @param header The RTP header deserialised from the packet.
 * @param incoming_data The packet data *not* header, i.e. this is the actual
 *   payload.
 * @param incoming_data_length The packet length *not* including header, i.e.
 *   this is the actual payload length.
 * @param log A logger.
 *
 * @retval -1 on error.
 * @retval 0 on success.
 */
static int handle_video_packet(const Logger *_Nonnull log, RTPSession *_Nonnull session, const struct RTPHeader *_Nonnull header,
                               const uint8_t *_Nonnull incoming_data, uint16_t incoming_data_length)
{
    // Full frame length in bytes. The frame may be split into multiple packets,
    // but this value is the complete assembled frame size.
    const uint32_t full_frame_length = header->data_length_full;

    // The sender tells us whether this is a key frame.
    const bool is_keyframe = (header->flags & RTP_KEY_FRAME) != 0;

    LOGGER_DEBUG(log, "wkbl->next_free_entry:003=%d", session->work_buffer_list->next_free_entry);

    const bool is_multipart = full_frame_length != incoming_data_length;

    /* The message was sent in single part */
    int8_t slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, is_multipart);
    LOGGER_DEBUG(log, "slot num=%d", slot_id);

    // get_slot told us to drop the packet, so we ignore it.
    if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) {
        return -1;
    }

    // get_slot said there is no free slot.
    if (slot_id == GET_SLOT_RESULT_DROP_OLDEST_SLOT) {
        LOGGER_DEBUG(log, "there was no free slot, so we process the oldest frame");
        // We now own the frame.
        struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, 0);

        // The process_frame function returns NULL if there is no slot 0, i.e.
        // the work buffer list is completely empty. It can't be empty, because
        // get_slot just told us it's full, so process_frame must return non-null.
        assert(m_new != nullptr);

        if (m_new->len >= 2) {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-001a b0=%d b1=%d", (int)m_new->data[0],
                         (int)m_new->data[1]);
        } else if (m_new->len == 1) {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-001a b0=%d", (int)m_new->data[0]);
        } else {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-001a (empty)");
        }
        update_bwc_values(session, m_new);
        // Pass ownership of m_new to the callback.
        session->mcb(session->mono_time, session->cs, m_new);
        // Now we no longer own m_new.
        m_new = nullptr;

        // Now we must have a free slot, so we either get that slot, i.e. >= 0,
        // or get told to drop the incoming packet if it's too old.
        slot_id = get_slot(log, session->work_buffer_list, is_keyframe, header, /* is_multipart */false);

        if (slot_id == GET_SLOT_RESULT_DROP_INCOMING) {
            // The incoming frame is too old, so we drop it.
            return -1;
        }
    }

    // We must have a valid slot here.
    assert(slot_id >= 0);

    LOGGER_DEBUG(log, "fill_data_into_slot.1");

    // fill in this part into the slot buffer at the correct offset
    if (!fill_data_into_slot(
                log,
                session->work_buffer_list,
                slot_id,
                is_keyframe,
                header,
                incoming_data,
                incoming_data_length)) {
        // Memory allocation failed. Return error.
        return -1;
    }

    struct RTPMessage *m_new = process_frame(log, session->work_buffer_list, slot_id);

    if (m_new != nullptr) {
        if (m_new->len >= 2) {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-003a b0=%d b1=%d", (int)m_new->data[0],
                         (int)m_new->data[1]);
        } else if (m_new->len == 1) {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-003a b0=%d", (int)m_new->data[0]);
        } else {
            LOGGER_DEBUG(log, "-- handle_video_packet -- CALLBACK-003a (empty)");
        }
        update_bwc_values(session, m_new);
        session->mcb(session->mono_time, session->cs, m_new);

        m_new = nullptr;
    }

    return 0;
}

/**
 * receive custom lossypackets and process them. they can be incoming audio or video packets
 */
void rtp_receive_packet(RTPSession *session, const uint8_t *data, size_t length)
{
    const Logger *log = session->log;

    if (length < RTP_HEADER_SIZE + 1) {
        LOGGER_WARNING(log, "Invalid length of received buffer!");
        return;
    }

    // Get the packet type.
    const uint8_t packet_type = data[0];
    const uint8_t *payload = &data[1];
    // TODO(Zoff): is this ok?
    const uint16_t payload_size = (uint16_t)length - 1;

    // Unpack the header.
    struct RTPHeader header;
    rtp_header_unpack(payload, &header);

    if (header.pt != packet_type % 128) {
        LOGGER_WARNING(log, "RTPHeader packet type and Tox protocol packet type did not agree: %d != %d",
                       header.pt, packet_type % 128);
        return;
    }

    if (header.pt != session->payload_type % 128) {
        LOGGER_WARNING(log, "RTPHeader packet type does not match this session's payload type: %d != %d",
                       header.pt, session->payload_type % 128);
        return;
    }

    if ((header.flags & RTP_LARGE_FRAME) != 0 && header.offset_full >= header.data_length_full) {
        LOGGER_ERROR(log, "Invalid video packet: frame offset (%u) >= full frame length (%u)",
                     (unsigned)header.offset_full, (unsigned)header.data_length_full);
        return;
    }

    if (header.offset_lower >= header.data_length_lower) {
        LOGGER_ERROR(log, "Invalid old protocol video packet: frame offset (%u) >= full frame length (%u)",
                     (unsigned)header.offset_lower, (unsigned)header.data_length_lower);
        return;
    }

    LOGGER_DEBUG(log, "header.pt %d, video %d", (uint8_t)header.pt, RTP_TYPE_VIDEO % 128);

    // The sender uses the new large-frame capable protocol and is sending a
    // video packet.
    if ((header.flags & RTP_LARGE_FRAME) != 0 && header.pt == (RTP_TYPE_VIDEO % 128)) {
        handle_video_packet(log, session, &header, &payload[RTP_HEADER_SIZE], payload_size - RTP_HEADER_SIZE);
        return;
    }

    // everything below here is for the old 16 bit protocol ------------------

    if (header.data_length_lower == payload_size - RTP_HEADER_SIZE) {
        /* The message is sent in single part */

        /* Message is not late; pick up the latest parameters */
        session->rsequnum = header.sequnum;
        session->rtimestamp = header.timestamp;
        if (session->add_recv != nullptr) {
            session->add_recv(session->bwc_user_data, payload_size);
        }

        /* Invoke processing of active multiparted message */
        if (session->mp != nullptr) {
            session->mcb(session->mono_time, session->cs, session->mp);
            session->mp = nullptr;
        }

        /* The message came in the allowed time;
         */

        session->mp = new_message(log, &header, payload_size - RTP_HEADER_SIZE, &payload[RTP_HEADER_SIZE], payload_size - RTP_HEADER_SIZE);
        session->mcb(session->mono_time, session->cs, session->mp);
        session->mp = nullptr;
        return;
    }

    /* The message is sent in multiple parts */

    if (session->mp != nullptr) {
        /* There are 2 possible situations in this case:
         *      1) being that we got the part of already processing message.
         *      2) being that we got the part of a new/old message.
         *
         * We handle them differently as we only allow a single multiparted
         * processing message
         */
        if (session->mp->header.sequnum == header.sequnum &&
                session->mp->header.timestamp == header.timestamp) {
            /* First case */

            /* Make sure we have enough allocated memory */
            if (session->mp->header.data_length_lower - session->mp->len < payload_size - RTP_HEADER_SIZE ||
                    session->mp->header.data_length_lower <= header.offset_lower ||
                    session->mp->header.data_length_lower - header.offset_lower < payload_size - RTP_HEADER_SIZE) {
                LOGGER_WARNING(log, "Corruption on the stream: multipart audio packet does not fit");
                return;
            }

            memcpy(session->mp->data + header.offset_lower, &payload[RTP_HEADER_SIZE],
                   payload_size - RTP_HEADER_SIZE);
            session->mp->len += payload_size - RTP_HEADER_SIZE;
            if (session->add_recv != nullptr) {
                session->add_recv(session->bwc_user_data, payload_size);
            }

            if (session->mp->len == session->mp->header.data_length_lower) {
                /* Received a full message; now push it for the further
                 * processing.
                 */
                session->mcb(session->mono_time, session->cs, session->mp);
                session->mp = nullptr;
            }
        } else {
            /* Second case */
            if (session->mp->header.timestamp > header.timestamp) {
                /* The received message part is from the old message;
                 * discard it.
                 */
                return;
            }

            /* Push the previous message for processing */
            session->mcb(session->mono_time, session->cs, session->mp);

            session->mp = nullptr;
            goto NEW_MULTIPARTED;
        }
    } else {
        /* In this case treat the message as if it was received in order
         */
        /* This is also a point for new multiparted messages */
NEW_MULTIPARTED:

        if (header.data_length_lower - header.offset_lower < payload_size - RTP_HEADER_SIZE) {
            LOGGER_WARNING(log, "Packet too long for buffer: offset %u + len %u > total %u",
                           (unsigned)header.offset_lower, (unsigned)(payload_size - RTP_HEADER_SIZE),
                           (unsigned)header.data_length_lower);
            return;
        }

        /* Message is not late; pick up the latest parameters */
        session->rsequnum = header.sequnum;
        session->rtimestamp = header.timestamp;
        if (session->add_recv != nullptr) {
            session->add_recv(session->bwc_user_data, payload_size);
        }

        /* Store message.
         */
        session->mp = new_message(log, &header, header.data_length_lower, &payload[RTP_HEADER_SIZE], payload_size - RTP_HEADER_SIZE);

        if (session->mp != nullptr) {
            memmove(session->mp->data + header.offset_lower, session->mp->data, session->mp->len);
        } else {
            LOGGER_WARNING(log, "new_message() returned a null pointer");
            return;
        }
    }

    return;
}

size_t rtp_header_pack(uint8_t *const rdata, const struct RTPHeader *header)
{
    uint8_t *p = rdata;
    *p = (header->ve & 3) << 6
         | (header->pe & 1) << 5
         | (header->xe & 1) << 4
         | (header->cc & 0xf);
    ++p;
    *p = (header->ma & 1) << 7
         | (header->pt & 0x7f);
    ++p;

    p += net_pack_u16(p, header->sequnum);
    p += net_pack_u32(p, header->timestamp);
    p += net_pack_u32(p, header->ssrc);
    p += net_pack_u64(p, header->flags);
    p += net_pack_u32(p, header->offset_full);
    p += net_pack_u32(p, header->data_length_full);
    p += net_pack_u32(p, header->received_length_full);

    for (size_t i = 0; i < RTP_PADDING_FIELDS; ++i) {
        p += net_pack_u32(p, 0);
    }

    p += net_pack_u16(p, header->offset_lower);
    p += net_pack_u16(p, header->data_length_lower);
    assert(p == rdata + RTP_HEADER_SIZE);
    return p - rdata;
}

size_t rtp_header_unpack(const uint8_t *data, struct RTPHeader *header)
{
    const uint8_t *p = data;
    header->ve = (*p >> 6) & 3;
    header->pe = (*p >> 5) & 1;
    header->xe = (*p >> 4) & 1;
    header->cc = *p & 0xf;
    ++p;

    header->ma = (*p >> 7) & 1;
    header->pt = *p & 0x7f;
    ++p;

    p += net_unpack_u16(p, &header->sequnum);
    p += net_unpack_u32(p, &header->timestamp);
    p += net_unpack_u32(p, &header->ssrc);
    p += net_unpack_u64(p, &header->flags);
    p += net_unpack_u32(p, &header->offset_full);
    p += net_unpack_u32(p, &header->data_length_full);
    p += net_unpack_u32(p, &header->received_length_full);

    p += sizeof(uint32_t) * RTP_PADDING_FIELDS;

    p += net_unpack_u16(p, &header->offset_lower);
    p += net_unpack_u16(p, &header->data_length_lower);
    assert(p == data + RTP_HEADER_SIZE);
    return p - data;
}

static uint32_t rtp_random_u32(void)
{
    // HINT: uses libsodium function
    return randombytes_random();
}

RTPSession *_Nullable rtp_new(const Logger *_Nonnull log, int payload_type, Mono_Time *_Nonnull mono_time,
                              rtp_send_packet_cb *_Nullable send_packet, void *_Nullable send_packet_user_data,
                              rtp_add_recv_cb *_Nullable add_recv, rtp_add_lost_cb *_Nullable add_lost, void *_Nullable bwc_user_data,
                              void *_Nonnull cs, rtp_m_cb *_Nonnull mcb)
{
    assert(mcb != nullptr);
    assert(cs != nullptr);

    RTPSession *session = (RTPSession *)calloc(1, sizeof(RTPSession));

    if (session == nullptr) {
        LOGGER_WARNING(log, "Alloc failed! Program might misbehave!");
        return nullptr;
    }

    session->work_buffer_list = (struct RTPWorkBufferList *)calloc(1, sizeof(struct RTPWorkBufferList));

    if (session->work_buffer_list == nullptr) {
        LOGGER_ERROR(log, "out of memory while allocating work buffer list");
        free(session);
        return nullptr;
    }

    // First entry is free.
    session->work_buffer_list->next_free_entry = 0;

    session->ssrc = payload_type == RTP_TYPE_VIDEO ? 0 : rtp_random_u32(); // Zoff: what is this??
    session->payload_type = payload_type;
    session->log = log;
    session->mono_time = mono_time;
    session->rtp_receive_active = true;

    session->send_packet = send_packet;
    session->send_packet_user_data = send_packet_user_data;
    session->add_recv = add_recv;
    session->add_lost = add_lost;
    session->bwc_user_data = bwc_user_data;

    // set NULL just in case
    session->mp = nullptr;
    session->first_packets_counter = 1;

    /* Also set payload type as prefix */
    session->cs = cs;
    session->mcb = mcb;

    return session;
}

void rtp_kill(const Logger *_Nonnull log, RTPSession *_Nullable session)
{
    if (session == nullptr) {
        LOGGER_WARNING(log, "No session");
        return;
    }

    LOGGER_DEBUG(log, "Terminated RTP session: %p", (void *)session);
    LOGGER_DEBUG(log, "Terminated RTP session V3 work_buffer_list->next_free_entry: %d",
                 (int)session->work_buffer_list->next_free_entry);

    if (session->work_buffer_list != nullptr) {
        for (int8_t i = 0; i < session->work_buffer_list->next_free_entry; ++i) {
            free(session->work_buffer_list->work_buffer[i].buf);
        }
        free(session->work_buffer_list);
    }
    free(session->mp);
    free(session);
}

void rtp_allow_receiving_mark(RTPSession *_Nullable session)
{
    if (session != nullptr) {
        session->rtp_receive_active = true;
    }
}

void rtp_stop_receiving_mark(RTPSession *_Nullable session)
{
    if (session != nullptr) {
        session->rtp_receive_active = false;
    }
}

static void rtp_send_piece(RTPSession *_Nonnull session, const struct RTPHeader *_Nonnull header,
                           const uint8_t *_Nonnull data, uint8_t *_Nonnull rdata, uint16_t length)
{
    rtp_header_pack(rdata + 1, header);
    memcpy(rdata + 1 + RTP_HEADER_SIZE, data, length);

    const uint16_t rdata_size = length + RTP_HEADER_SIZE + 1;

    if (session->send_packet != nullptr) {
        session->send_packet(session->send_packet_user_data, rdata, rdata_size);
    }
}

static struct RTPHeader rtp_default_header(const RTPSession *_Nonnull session, uint32_t length, bool is_keyframe)
{
    uint16_t length_safe = (uint16_t)length;

    if (length > UINT16_MAX) {
        length_safe = UINT16_MAX;
    }

    struct RTPHeader header = {0};

    if (is_keyframe) {
        header.flags |= RTP_KEY_FRAME;
    }

    if (session->payload_type == RTP_TYPE_VIDEO) {
        header.flags |= RTP_LARGE_FRAME;
    }

    header.ve = 2;  // this is unused in toxav
    header.pe = 0;
    header.xe = 0;
    header.cc = 0;
    header.ma = 0;
    header.pt = session->payload_type % 128;
    header.sequnum = session->sequnum;
    if (session->mono_time != nullptr) {
        header.timestamp = current_time_monotonic(session->mono_time);
    } else {
        header.timestamp = 0;
    }
    header.ssrc = session->ssrc;
    header.offset_lower = 0;
    header.data_length_lower = length_safe;
    header.data_length_full = length; // without header
    header.offset_full = 0;

    return header;
}

/**
 * @brief Send a frame of audio or video data, chunked in @ref RTPMessage instances.
 *
 * @param session The A/V session to send the data for.
 * @param data A byte array of length @p length.
 * @param length The number of bytes to send from @p data.
 * @param is_keyframe Whether this video frame is a key frame. If it is an
 *   audio frame, this parameter is ignored.
 */
int rtp_send_data(const Logger *log, RTPSession *session, const uint8_t *data, uint32_t length,
                  bool is_keyframe)
{
    if (session == nullptr) {
        return -1;
    }

    const uint16_t rdata_size = min_u32(length + RTP_HEADER_SIZE + 1, MAX_CRYPTO_DATA_SIZE);
    VLA(uint8_t, rdata, rdata_size);
    memset(rdata, 0, rdata_size);
    rdata[0] = session->payload_type;  // packet id == payload_type

    struct RTPHeader header = rtp_default_header(session, length, is_keyframe);

    if (MAX_CRYPTO_DATA_SIZE > (length + RTP_HEADER_SIZE + 1)) {
        /*
         * The length is lesser than the maximum allowed length (including header)
         * Send the packet in single piece.
         */
        assert(length < UINT16_MAX);
        rtp_send_piece(session, &header, data, rdata, (uint16_t)length);
    } else {
        /*
         * The length is greater than the maximum allowed length (including header)
         * Send the packet in multiple pieces.
         */
        uint32_t sent = 0;
        uint16_t piece = MAX_CRYPTO_DATA_SIZE - (RTP_HEADER_SIZE + 1);

        while ((length - sent) + RTP_HEADER_SIZE + 1 > MAX_CRYPTO_DATA_SIZE) {
            rtp_send_piece(session, &header, data + sent, rdata, piece);

            sent += piece;
            header.offset_lower = (uint16_t)sent;
            header.offset_full = sent; // raw data offset, without any header
        }

        /* Send remaining */
        piece = (uint16_t)(length - sent);

        if (piece != 0) {
            rtp_send_piece(session, &header, data + sent, rdata, piece);
        }
    }

    ++session->sequnum;
    return 0;
}