1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
#include <string>
#include <sstream>
#include <vector>
#include <list>
#include <memory>
#include <exception>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "testmedia.hpp"
#include "logsupport.hpp"
#define SRT_ENABLE_VERBOSE_LOCK 1
#include "verbose.hpp"
#include "logging.h"
#include "threadname.h"
extern srt_logging::Logger applog;
template<class MediumDir>
struct Medium
{
MediumDir* med = nullptr;
std::unique_ptr<MediumDir> pinned_med;
std::list<MediaPacket> buffer;
std::mutex buffer_lock;
std::thread thr;
std::condition_variable ready;
srt::sync::atomic<bool> running {false};
std::exception_ptr xp; // To catch exception thrown by a thread
virtual void Runner() = 0;
void RunnerBase()
{
try
{
running = true;
Runner();
}
catch (...)
{
xp = std::current_exception();
}
//Verb() << "Medium: " << this << ": thread exit";
std::unique_lock<std::mutex> g(buffer_lock);
running = false;
ready.notify_all();
//Verb() << VerbLock << "Medium: EXIT NOTIFIED";
}
void run()
{
running = true;
std::ostringstream tns;
tns << typeid(*this).name() << ":" << this;
srt::ThreadName tn(tns.str());
thr = std::thread( [this] { RunnerBase(); } );
}
void quit()
{
if (!med)
return;
LOGP(applog.Debug, "Medium(", typeid(*med).name(), ") quit. Buffer contains ", buffer.size(), " blocks");
std::string name;
if (Verbose::on)
name = typeid(*med).name();
med->Close();
if (thr.joinable())
{
LOGP(applog.Debug, "Medium::quit: Joining medium thread (", name, ") ...");
thr.join();
LOGP(applog.Debug, "... done");
}
if (xp)
{
try {
std::rethrow_exception(xp);
} catch (TransmissionError& e) {
if (Verbose::on)
Verb() << VerbLock << "Medium " << this << " exited with Transmission Error:\n\t" << e.what();
else
std::cerr << "Transmission Error: " << e.what() << std::endl;
} catch (...) {
if (Verbose::on)
Verb() << VerbLock << "Medium " << this << " exited with UNKNOWN EXCEPTION:";
else
std::cerr << "UNKNOWN EXCEPTION on medium\n";
}
}
// Prevent further quits from running
med = nullptr;
}
void Setup(MediumDir* t)
{
med = t;
// Leave pinned_med as 0
}
void Setup(std::unique_ptr<MediumDir>&& medbase)
{
pinned_med = std::move(medbase);
med = pinned_med.get();
}
virtual ~Medium()
{
//Verb() << "Medium: " << this << " DESTROYED. Threads quit.";
quit();
}
virtual void Start() { run(); }
virtual void Stop() { quit(); }
};
struct SourceMedium: Medium<Source>
{
size_t chunksize_ = 0;
typedef Medium<Source> Base;
// Source Runner: read payloads and put on the buffer
void Runner() override;
// External user: call this to get the buffer.
MediaPacket Extract();
template<class Arg>
void Setup(Arg&& medium, size_t chunksize)
{
chunksize_ = chunksize;
return Base::Setup(std::move(medium));
}
};
struct TargetMedium: Medium<Target>
{
void Runner() override;
bool Schedule(const MediaPacket& data);
void Clear()
{
std::lock_guard<std::mutex> lg(buffer_lock);
buffer.clear();
}
void Interrupt()
{
std::lock_guard<std::mutex> lg(buffer_lock);
running = false;
ready.notify_one();
}
~TargetMedium()
{
//Verb() << "TargetMedium: DESTROYING";
Interrupt();
// ~Medium will do quit() additionally, which joins the thread
}
};
|