File: tsbpd_time.cpp

package info (click to toggle)
srt 1.5.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,804 kB
  • sloc: cpp: 52,175; ansic: 5,746; tcl: 1,183; sh: 318; python: 99; makefile: 38
file content (285 lines) | stat: -rw-r--r-- 10,606 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/*
 * SRT - Secure, Reliable, Transport
 * Copyright (c) 2021 Haivision Systems Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 */
#include "tsbpd_time.h"

#include "logging.h"
#include "logger_defs.h"
#include "packet.h"

using namespace srt_logging;
using namespace srt::sync;

namespace srt
{

#if SRT_DEBUG_TRACE_DRIFT
class drift_logger
{
    typedef srt::sync::steady_clock steady_clock;

public:
    drift_logger() {}

    ~drift_logger()
    {
        ScopedLock lck(m_mtx);
        m_fout.close();
    }

    void trace(unsigned                                   ackack_timestamp,
               int                                        rtt_us,
               int64_t                                    drift_sample,
               int64_t                                    drift,
               int64_t                                    overdrift,
               const srt::sync::steady_clock::time_point& pkt_base,
               const srt::sync::steady_clock::time_point& tsbpd_base)
    {
        using namespace srt::sync;
        ScopedLock lck(m_mtx);
        create_file();

        // std::string str_tnow = srt::sync::FormatTime(steady_clock::now());
        // str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [STDY]' part

        std::string str_tbase = srt::sync::FormatTime(tsbpd_base);
        str_tbase.resize(str_tbase.size() - 7); // remove trailing ' [STDY]' part

        std::string str_pkt_base = srt::sync::FormatTime(pkt_base);
        str_pkt_base.resize(str_pkt_base.size() - 7); // remove trailing ' [STDY]' part

        // m_fout << str_tnow << ",";
        m_fout << count_microseconds(steady_clock::now() - m_start_time) << ",";
        m_fout << ackack_timestamp << ",";
        m_fout << rtt_us << ",";
        m_fout << drift_sample << ",";
        m_fout << drift << ",";
        m_fout << overdrift << ",";
        m_fout << str_pkt_base << ",";
        m_fout << str_tbase << "\n";
        m_fout.flush();
    }

private:
    void print_header()
    {
        m_fout << "usElapsedStd,usAckAckTimestampStd,";
        m_fout << "usRTTStd,usDriftSampleStd,usDriftStd,usOverdriftStd,tsPktBase,TSBPDBase\n";
    }

    void create_file()
    {
        if (m_fout.is_open())
            return;

        m_start_time         = srt::sync::steady_clock::now();
        std::string str_tnow = srt::sync::FormatTimeSys(m_start_time);
        str_tnow.resize(str_tnow.size() - 7); // remove trailing ' [SYST]' part
        while (str_tnow.find(':') != std::string::npos)
        {
            str_tnow.replace(str_tnow.find(':'), 1, 1, '_');
        }
        const std::string fname = "drift_trace_" + str_tnow + ".csv";
        m_fout.open(fname, std::ofstream::out);
        if (!m_fout)
            std::cerr << "IPE: Failed to open " << fname << "!!!\n";

        print_header();
    }

private:
    srt::sync::Mutex                    m_mtx;
    std::ofstream                       m_fout;
    srt::sync::steady_clock::time_point m_start_time;
};

drift_logger g_drift_logger;

#endif // SRT_DEBUG_TRACE_DRIFT

bool CTsbpdTime::addDriftSample(uint32_t usPktTimestamp, const time_point& tsPktArrival, int usRTTSample)
{
    if (!m_bTsbPdMode)
        return false;

    ExclusiveLock lck(m_mtxRW);

    // Remember the first RTT sample measured. Ideally we need RTT0 - the one from the handshaking phase,
    // because TSBPD base is initialized there. But HS-based RTT is not yet implemented.
    // Take the first one assuming it is close to RTT0.
    if (m_iFirstRTT == -1)
    {
        m_iFirstRTT = usRTTSample;
    }

    // A change in network delay has to be taken into account. The only way to get some estimation of it
    // is to estimate RTT change and assume that the change of the one way network delay is
    // approximated by the half of the RTT change.
    const duration               tdRTTDelta    = usRTTSample >= 0 ? microseconds_from((usRTTSample - m_iFirstRTT) / 2) : duration(0);
    const time_point             tsPktBaseTime = getPktBaseTimeNoLock(usPktTimestamp);
    const steady_clock::duration tdDrift       = tsPktArrival - tsPktBaseTime - tdRTTDelta;

    const bool updated = m_DriftTracer.update(count_microseconds(tdDrift));

    if (updated)
    {
        IF_HEAVY_LOGGING(const steady_clock::time_point oldbase = m_tsTsbPdTimeBase);
        steady_clock::duration overdrift = microseconds_from(m_DriftTracer.overdrift());
        m_tsTsbPdTimeBase += overdrift;

        HLOGC(brlog.Debug,
              log << "DRIFT=" << FormatDuration(tdDrift) << " AVG=" << (m_DriftTracer.drift() / 1000.0)
                  << "ms, TB: " << FormatTime(oldbase) << " EXCESS: " << FormatDuration(overdrift)
                  << " UPDATED TO: " << FormatTime(m_tsTsbPdTimeBase));
    }
    else
    {
        HLOGC(brlog.Debug,
              log << "DRIFT=" << FormatDuration(tdDrift) << " TB REMAINS: " << FormatTime(m_tsTsbPdTimeBase));
    }

#if SRT_DEBUG_TRACE_DRIFT
    g_drift_logger.trace(usPktTimestamp,
                         usRTTSample,
                         count_microseconds(tdDrift),
                         m_DriftTracer.drift(),
                         m_DriftTracer.overdrift(),
                         tsPktBaseTime,
                         m_tsTsbPdTimeBase);
#endif
    return updated;
}

void CTsbpdTime::setTsbPdMode(const steady_clock::time_point& timebase, bool wrap, duration delay)
{
    ExclusiveLock lck(m_mtxRW);
    m_bTsbPdMode      = true;
    m_bTsbPdWrapCheck = wrap;

    // Timebase passed here comes is calculated as:
    // Tnow - hspkt.m_iTimeStamp
    // where hspkt is the packet with SRT_CMD_HSREQ message.
    //
    // This function is called in the HSREQ reception handler only.
    m_tsTsbPdTimeBase = timebase;
    m_tdTsbPdDelay    = delay;
}

void CTsbpdTime::applyGroupTime(const steady_clock::time_point& timebase,
                                bool                            wrp,
                                uint32_t                        delay,
                                const steady_clock::duration&   udrift)
{
    // Same as setTsbPdMode, but predicted to be used for group members.
    // This synchronizes the time from the INTERNAL TIMEBASE of an existing
    // socket's internal timebase. This is required because the initial time
    // base stays always the same, whereas the internal timebase undergoes
    // adjustment as the 32-bit timestamps in the sockets wrap. The socket
    // newly added to the group must get EXACTLY the same internal timebase
    // or otherwise the TsbPd time calculation will ship different results
    // on different member sockets.
    ExclusiveLock lck(m_mtxRW);

    m_bTsbPdMode = true;

    m_tsTsbPdTimeBase = timebase;
    m_bTsbPdWrapCheck = wrp;
    m_tdTsbPdDelay    = microseconds_from(delay);
    m_DriftTracer.forceDrift(count_microseconds(udrift));
}

void CTsbpdTime::applyGroupDrift(const steady_clock::time_point& timebase,
                                 bool                            wrp,
                                 const steady_clock::duration&   udrift)
{
    ExclusiveLock lck(m_mtxRW);
    // This is only when a drift was updated on one of the group members.
    HLOGC(brlog.Debug,
          log << "rcv-buffer: group synch uDRIFT: " << m_DriftTracer.drift() << " -> " << FormatDuration(udrift)
              << " TB: " << FormatTime(m_tsTsbPdTimeBase) << " -> " << FormatTime(timebase));

    m_tsTsbPdTimeBase = timebase;
    m_bTsbPdWrapCheck = wrp;

    m_DriftTracer.forceDrift(count_microseconds(udrift));
}

CTsbpdTime::time_point CTsbpdTime::getBaseTimeNoLock(uint32_t timestamp_us) const
{
    // A data packet within [TSBPD_WRAP_PERIOD; 2 * TSBPD_WRAP_PERIOD] would end TSBPD wrap-aware state.
    // Some incoming control packets may not update the TSBPD base (calling updateBaseTime(..)),
    // but may come before a data packet with a timestamp in this range. Therefore the whole range should be tracked.
    const int64_t carryover_us =
        (m_bTsbPdWrapCheck && timestamp_us <= 2 * TSBPD_WRAP_PERIOD) ? int64_t(CPacket::MAX_TIMESTAMP) + 1 : 0;

    return (m_tsTsbPdTimeBase + microseconds_from(carryover_us));
}

CTsbpdTime::time_point CTsbpdTime::getBaseTime(uint32_t timestamp_us) const
{
    SharedLock lck(m_mtxRW);
    return getBaseTimeNoLock(timestamp_us);
}

CTsbpdTime::time_point CTsbpdTime::getPktTime(uint32_t usPktTimestamp) const
{
    SharedLock lck(m_mtxRW);
    time_point value = getPktBaseTimeNoLock(usPktTimestamp) + m_tdTsbPdDelay + microseconds_from(m_DriftTracer.drift());

    return value;
}

CTsbpdTime::time_point CTsbpdTime::getPktBaseTimeNoLock(uint32_t usPktTimestamp) const
{
    return getBaseTimeNoLock(usPktTimestamp) + microseconds_from(usPktTimestamp);
}

CTsbpdTime::time_point CTsbpdTime::getPktBaseTime(uint32_t usPktTimestamp) const
{
    return getBaseTime(usPktTimestamp) + microseconds_from(usPktTimestamp);
}

void CTsbpdTime::updateBaseTime(uint32_t usPktTimestamp)
{
    ExclusiveLock lck(m_mtxRW);
    if (m_bTsbPdWrapCheck)
    {
        // Wrap check period.
        if ((usPktTimestamp >= TSBPD_WRAP_PERIOD) && (usPktTimestamp <= (TSBPD_WRAP_PERIOD * 2)))
        {
            /* Exiting wrap check period (if for packet delivery head) */
            m_bTsbPdWrapCheck = false;
            m_tsTsbPdTimeBase += microseconds_from(int64_t(CPacket::MAX_TIMESTAMP) + 1);
            LOGC(tslog.Debug,
                 log << "tsbpd wrap period ends with ts=" << usPktTimestamp << " - NEW TIME BASE: "
                     << FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us");
        }
        return;
    }

    // Check if timestamp is within the TSBPD_WRAP_PERIOD before reaching the MAX_TIMESTAMP.
    if (usPktTimestamp > (CPacket::MAX_TIMESTAMP - TSBPD_WRAP_PERIOD))
    {
        // Approching wrap around point, start wrap check period (if for packet delivery head)
        m_bTsbPdWrapCheck = true;
        LOGC(tslog.Debug,
             log << "tsbpd wrap period begins with ts=" << usPktTimestamp
                 << " TIME BASE: " << FormatTime(m_tsTsbPdTimeBase) << " drift: " << m_DriftTracer.drift() << "us.");
    }
}

void CTsbpdTime::getInternalTimeBase(time_point& w_tb, bool& w_wrp, duration& w_udrift) const
{
    ExclusiveLock lck(m_mtxRW);
    w_tb     = m_tsTsbPdTimeBase;
    w_udrift = microseconds_from(m_DriftTracer.drift());
    w_wrp    = m_bTsbPdWrapCheck;
}

} // namespace srt