File: sync_posix.cpp

package info (click to toggle)
srt 1.5.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,804 kB
  • sloc: cpp: 52,175; ansic: 5,746; tcl: 1,183; sh: 318; python: 99; makefile: 38
file content (560 lines) | stat: -rw-r--r-- 15,693 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
/*
 * SRT - Secure, Reliable, Transport
 * Copyright (c) 2019 Haivision Systems Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 */
#include "platform_sys.h"

#include <iomanip>
#include <math.h>
#include <stdexcept>
#include "sync.h"
#include "utilities.h"
#include "udt.h"
#include "srt.h"
#include "srt_compat.h"
#include "logging.h"
#include "common.h"

#if defined(_WIN32)
#include "win/wintime.h"
#include <sys/timeb.h>
#elif TARGET_OS_MAC
#include <mach/mach_time.h>
#endif

namespace srt_logging
{
    extern Logger inlog;
}
using namespace srt_logging;

namespace srt
{
namespace sync
{

static void rdtsc(uint64_t& x)
{
#if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_IA32_RDTSC
    uint32_t lval, hval;
    // asm volatile ("push %eax; push %ebx; push %ecx; push %edx");
    // asm volatile ("xor %eax, %eax; cpuid");
    asm volatile("rdtsc" : "=a"(lval), "=d"(hval));
    // asm volatile ("pop %edx; pop %ecx; pop %ebx; pop %eax");
    x = hval;
    x = (x << 32) | lval;
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_IA64_ITC
    asm("mov %0=ar.itc" : "=r"(x)::"memory");
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_AMD64_RDTSC
    uint32_t lval, hval;
    asm volatile("rdtsc" : "=a"(lval), "=d"(hval));
    x = hval;
    x = (x << 32) | lval;
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_WINQPC
    // This function should not fail, because we checked the QPC
    // when calling to QueryPerformanceFrequency. If it failed,
    // the m_bUseMicroSecond was set to true.
    QueryPerformanceCounter((LARGE_INTEGER*)&x);
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_MACH_ABSTIME
    x = mach_absolute_time();
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
    // get_cpu_frequency() returns 1 us accuracy in this case
    timespec tm;
    clock_gettime(CLOCK_MONOTONIC, &tm);
    x = tm.tv_sec * uint64_t(1000000) + (tm.tv_nsec / 1000);
#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_POSIX_GETTIMEOFDAY
    // use system call to read time clock for other archs
    timeval t;
    gettimeofday(&t, 0);
    x = t.tv_sec * uint64_t(1000000) + t.tv_usec;
#else
#error Wrong SRT_SYNC_CLOCK
#endif
}

static int64_t get_cpu_frequency()
{
    int64_t frequency = 1; // 1 tick per microsecond.

#if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_WINQPC
    LARGE_INTEGER ccf; // in counts per second
    if (QueryPerformanceFrequency(&ccf))
    {
        frequency = ccf.QuadPart / 1000000; // counts per microsecond
        if (frequency == 0)
        {
            LOGC(inlog.Warn, log << "Win QPC frequency of " << ccf.QuadPart
                << " counts/s is below the required 1 us accuracy. Please consider using C++11 timing (-DENABLE_STDCXX_SYNC=ON) instead.");
            frequency = 1; // set back to 1 to avoid division by zero.
        }
    }
    else
    {
        // Can't throw an exception, it won't be handled.
        LOGC(inlog.Error, log << "IPE: QueryPerformanceFrequency failed with " << GetLastError());
    }

#elif SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_MACH_ABSTIME
    mach_timebase_info_data_t info;
    mach_timebase_info(&info);
    frequency = info.denom * int64_t(1000) / info.numer;

#elif SRT_SYNC_CLOCK >= SRT_SYNC_CLOCK_AMD64_RDTSC && SRT_SYNC_CLOCK <= SRT_SYNC_CLOCK_IA64_ITC
    // SRT_SYNC_CLOCK_AMD64_RDTSC or SRT_SYNC_CLOCK_IA32_RDTSC or SRT_SYNC_CLOCK_IA64_ITC
    uint64_t t1, t2;

    rdtsc(t1);
    timespec ts;
    ts.tv_sec  = 0;
    ts.tv_nsec = 100000000;
    nanosleep(&ts, NULL);
    rdtsc(t2);

    // CPU clocks per microsecond
    frequency = int64_t(t2 - t1) / 100000;
#endif

    return frequency;
}

static int count_subsecond_precision(int64_t ticks_per_us)
{
    int signs = 6; // starting from 1 us
    while (ticks_per_us /= 10) ++signs;
    return signs;
}

const int64_t s_clock_ticks_per_us = get_cpu_frequency();

const int s_clock_subsecond_precision = count_subsecond_precision(s_clock_ticks_per_us);

int clockSubsecondPrecision() { return s_clock_subsecond_precision; }

} // namespace sync
} // namespace srt

////////////////////////////////////////////////////////////////////////////////
//
// Sync utilities section
//
////////////////////////////////////////////////////////////////////////////////

static timespec us_to_timespec(const uint64_t time_us)
{
    timespec timeout;
    timeout.tv_sec         = time_us / 1000000;
    timeout.tv_nsec        = (time_us % 1000000) * 1000;
    return timeout;
}

////////////////////////////////////////////////////////////////////////////////
//
// TimePoint section
//
////////////////////////////////////////////////////////////////////////////////

template <>
srt::sync::Duration<srt::sync::steady_clock> srt::sync::TimePoint<srt::sync::steady_clock>::time_since_epoch() const
{
    return srt::sync::Duration<srt::sync::steady_clock>(m_timestamp);
}

srt::sync::TimePoint<srt::sync::steady_clock> srt::sync::steady_clock::now()
{
    uint64_t x = 0;
    rdtsc(x);
    return TimePoint<steady_clock>(x);
}

int64_t srt::sync::count_microseconds(const steady_clock::duration& t)
{
    return t.count() / s_clock_ticks_per_us;
}

int64_t srt::sync::count_milliseconds(const steady_clock::duration& t)
{
    return t.count() / s_clock_ticks_per_us / 1000;
}

int64_t srt::sync::count_seconds(const steady_clock::duration& t)
{
    return t.count() / s_clock_ticks_per_us / 1000000;
}

srt::sync::steady_clock::duration srt::sync::microseconds_from(int64_t t_us)
{
    return steady_clock::duration(t_us * s_clock_ticks_per_us);
}

srt::sync::steady_clock::duration srt::sync::milliseconds_from(int64_t t_ms)
{
    return steady_clock::duration((1000 * t_ms) * s_clock_ticks_per_us);
}

srt::sync::steady_clock::duration srt::sync::seconds_from(int64_t t_s)
{
    return steady_clock::duration((1000000 * t_s) * s_clock_ticks_per_us);
}

srt::sync::Mutex::Mutex()
{
    const int err = pthread_mutex_init(&m_mutex, 0);
    if (err)
    {
        throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
    }
}

srt::sync::Mutex::~Mutex()
{
    pthread_mutex_destroy(&m_mutex);
}

int srt::sync::Mutex::lock()
{
    return pthread_mutex_lock(&m_mutex);
}

int srt::sync::Mutex::unlock()
{
    return pthread_mutex_unlock(&m_mutex);
}

bool srt::sync::Mutex::try_lock()
{
    return (pthread_mutex_trylock(&m_mutex) == 0);
}

srt::sync::UniqueLock::UniqueLock(Mutex& m)
    : m_Mutex(m)
{
    m_iLocked = m_Mutex.lock();
}

srt::sync::UniqueLock::~UniqueLock()
{
    if (m_iLocked == 0)
    {
        unlock();
    }
}

void srt::sync::UniqueLock::lock()
{
    if (m_iLocked != -1)
        throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);

    m_iLocked = m_Mutex.lock();
}

void srt::sync::UniqueLock::unlock()
{
    if (m_iLocked != 0)
        throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);

    m_Mutex.unlock();
    m_iLocked = -1;
}

srt::sync::Mutex* srt::sync::UniqueLock::mutex()
{
    return &m_Mutex;
}

////////////////////////////////////////////////////////////////////////////////
//
// Condition section (based on pthreads)
//
////////////////////////////////////////////////////////////////////////////////

namespace srt
{
namespace sync
{

Condition::Condition()
#ifdef _WIN32
    : m_cv(PTHREAD_COND_INITIALIZER)
#endif
{}

Condition::~Condition() {}

void Condition::init()
{
    pthread_condattr_t* attr = NULL;
#if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
    pthread_condattr_t  CondAttribs;
    pthread_condattr_init(&CondAttribs);
    pthread_condattr_setclock(&CondAttribs, CLOCK_MONOTONIC);
    attr = &CondAttribs;
#endif
    const int res = pthread_cond_init(&m_cv, attr);
    if (res != 0)
        throw std::runtime_error("pthread_cond_init monotonic failed");
}

void Condition::destroy()
{
    pthread_cond_destroy(&m_cv);
}

void Condition::wait(UniqueLock& lock)
{
    pthread_cond_wait(&m_cv, &lock.mutex()->ref());
}

bool Condition::wait_for(UniqueLock& lock, const steady_clock::duration& rel_time)
{
    timespec timeout;
#if SRT_SYNC_CLOCK == SRT_SYNC_CLOCK_GETTIME_MONOTONIC
    clock_gettime(CLOCK_MONOTONIC, &timeout);
    const uint64_t now_us = timeout.tv_sec * uint64_t(1000000) + (timeout.tv_nsec / 1000);
#else
    timeval now;
    gettimeofday(&now, 0);
    const uint64_t now_us = now.tv_sec * uint64_t(1000000) + now.tv_usec;
#endif
    timeout = us_to_timespec(now_us + count_microseconds(rel_time));
    return pthread_cond_timedwait(&m_cv, &lock.mutex()->ref(), &timeout) != ETIMEDOUT;
}

bool Condition::wait_until(UniqueLock& lock, const steady_clock::time_point& timeout_time)
{
    // This will work regardless as to which clock is in use. The time
    // should be specified as steady_clock::time_point, so there's no
    // question of the timer base.
    const steady_clock::time_point now = steady_clock::now();
    if (now >= timeout_time)
        return false; // timeout

    // wait_for() is used because it will be converted to pthread-frienly timeout_time inside.
    return wait_for(lock, timeout_time - now);
}

void Condition::notify_one()
{
    pthread_cond_signal(&m_cv);
}

void Condition::notify_all()
{
    pthread_cond_broadcast(&m_cv);
}

}; // namespace sync
}; // namespace srt


////////////////////////////////////////////////////////////////////////////////
//
// CThread class
//
////////////////////////////////////////////////////////////////////////////////

srt::sync::CThread::CThread()
{
    m_thread = pthread_t();
}

srt::sync::CThread::CThread(void *(*start_routine) (void *), void *arg)
{
    create(start_routine, arg);
}

#if HAVE_FULL_CXX11
srt::sync::CThread& srt::sync::CThread::operator=(CThread&& other)
#else
srt::sync::CThread& srt::sync::CThread::operator=(CThread& other)
#endif
{
    if (joinable())
    {
        // If the thread has already terminated, then
        // pthread_join() returns immediately.
        // But we have to check it has terminated before replacing it.
        LOGC(inlog.Error, log << "IPE: Assigning to a thread that is not terminated!");

#ifndef DEBUG
#ifndef __ANDROID__
        // In case of production build the hanging thread should be terminated
        // to avoid hang ups and align with C++11 implementation.
        // There is no pthread_cancel on Android. See #1476. This error should not normally
        // happen, but if it happen, then detaching the thread.
        pthread_cancel(m_thread);
#endif // __ANDROID__
#else
        join();
#endif
    }

    // Move thread handler from other
    m_thread = other.m_thread;
    other.m_thread = pthread_t();
    return *this;
}

#if !HAVE_FULL_CXX11
void srt::sync::CThread::create_thread(void *(*start_routine) (void *), void *arg)
{
    SRT_ASSERT(!joinable());
    create(start_routine, arg);
}
#endif

bool srt::sync::CThread::joinable() const
{
    return !pthread_equal(m_thread, pthread_t());
}

void srt::sync::CThread::join()
{
    void *retval;
    const int ret SRT_ATR_UNUSED = pthread_join(m_thread, &retval);
    if (ret != 0)
    {
        LOGC(inlog.Error, log << "pthread_join failed with " << ret);
    }
#ifdef HEAVY_LOGGING
    else
    {
        HLOGC(inlog.Debug, log << "pthread_join SUCCEEDED");
    }
#endif
    // After joining, joinable should be false
    m_thread = pthread_t();
    return;
}

void srt::sync::CThread::create(void *(*start_routine) (void *), void *arg)
{
    const int st = pthread_create(&m_thread, NULL, start_routine, arg);
    if (st != 0)
    {
        LOGC(inlog.Error, log << "pthread_create failed with " << st);
        throw CThreadException(MJ_SYSTEMRES, MN_THREAD, 0);
    }
}


////////////////////////////////////////////////////////////////////////////////
//
// CThreadError class - thread local storage error wrapper
//
////////////////////////////////////////////////////////////////////////////////
namespace srt {
namespace sync {

class CThreadError
{
public:
    CThreadError()
    {
        pthread_key_create(&m_ThreadSpecKey, ThreadSpecKeyDestroy);

        // This is a global object and as such it should be called in the
        // main application thread or at worst in the thread that has first
        // run `srt_startup()` function and so requested the SRT library to
        // be dynamically linked. Most probably in this very thread the API
        // errors will be reported, so preallocate the ThreadLocalSpecific
        // object for this error description.

        // This allows std::bac_alloc to crash the program during
        // the initialization of the SRT library (likely it would be
        // during the DL constructor, still way before any chance of
        // doing any operations here). This will prevent SRT from running
        // into trouble while trying to operate.
        CUDTException* ne = new CUDTException();
        pthread_setspecific(m_ThreadSpecKey, ne);
    }

    ~CThreadError()
    {
        // Likely all objects should be deleted in all
        // threads that have exited, but std::this_thread didn't exit
        // yet :).
        ThreadSpecKeyDestroy(pthread_getspecific(m_ThreadSpecKey));
        pthread_key_delete(m_ThreadSpecKey);
    }

    void set(const CUDTException& e)
    {
        CUDTException* cur = get();
        // If this returns NULL, it means that there was an unexpected
        // memory allocation error. Simply ignore this request if so
        // happened, and then when trying to get the error description
        // the application will always get the memory allocation error.

        // There's no point in doing anything else here; lack of memory
        // must be prepared for prematurely, and that was already done.
        if (!cur)
            return;

        *cur = e;
    }

    /*[[nullable]]*/ CUDTException* get()
    {
        if (!pthread_getspecific(m_ThreadSpecKey))
        {
            // This time if this can't be done due to memory allocation
            // problems, just allow this value to be NULL, which during
            // getting the error description will redirect to a memory
            // allocation error.

            // It would be nice to somehow ensure that this object is
            // created in every thread of the application using SRT, but
            // POSIX thread API doesn't contain any possibility to have
            // a creation callback that would apply to every thread in
            // the application (as it is for C++11 thread_local storage).
            CUDTException* ne = new(std::nothrow) CUDTException();
            pthread_setspecific(m_ThreadSpecKey, ne);
            return ne;
        }
        return (CUDTException*)pthread_getspecific(m_ThreadSpecKey);
    }

    static void ThreadSpecKeyDestroy(void* e)
    {
        delete (CUDTException*)e;
    }

private:
    pthread_key_t m_ThreadSpecKey;
};

// Threal local error will be used by CUDTUnited
// that has a static scope

// This static makes this object file-private access so that
// the access is granted only for the accessor functions.
static CThreadError s_thErr;

void SetThreadLocalError(const CUDTException& e)
{
    s_thErr.set(e);
}

CUDTException& GetThreadLocalError()
{
    // In POSIX version we take into account the possibility
    // of having an allocation error here. Therefore we need to
    // allow thie value to return NULL and have some fallback
    // for that case. The dynamic memory allocation failure should
    // be the only case as to why it is unable to get the pointer
    // to the error description.
    static CUDTException resident_alloc_error (MJ_SYSTEMRES, MN_MEMORY);
    CUDTException* curx = s_thErr.get();
    if (!curx)
        return resident_alloc_error;
    return *curx;
}

} // namespace sync
} // namespace srt