File: srt-test-mpbond.cpp

package info (click to toggle)
srt 1.5.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,804 kB
  • sloc: cpp: 52,175; ansic: 5,746; tcl: 1,183; sh: 318; python: 99; makefile: 38
file content (313 lines) | stat: -rw-r--r-- 8,979 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/*
 * SRT - Secure, Reliable, Transport
 * Copyright (c) 2018 Haivision Systems Inc.
 * 
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 * 
 */

#include <memory>
#include <thread>
#include <list>
#include <utility>
#include <chrono>
#include <csignal>
#include <iterator>
#include <stdexcept>

#define REQUIRE_CXX11 1

#include "apputil.hpp"  // CreateAddr
#include "uriparser.hpp"  // UriParser
#include "socketoptions.hpp"
#include "logsupport.hpp"
#include "testmediabase.hpp"
#include "testmedia.hpp"
#include "netinet_any.h"
#include "threadname.h"
#include "verbose.hpp"

#include <srt.h>
#include <logging.h>

// Make the windows-nonexistent alarm an empty call
#ifdef _WIN32
#define alarm(argument) (void)0
#define signal_alarm(fn) (void)0
#else
#define signal_alarm(fn) signal(SIGALRM, fn)
#endif

srt_logging::Logger applog(SRT_LOGFA_APP, srt_logger_config, "srt-mpbond");

using namespace srt;
using namespace std;


volatile bool mpbond_int_state = false;
void OnINT_SetIntState(int)
{
    cerr << "\n-------- REQUESTED INTERRUPT!\n";
    mpbond_int_state = true;
}

int main( int argc, char** argv )
{
    // This is mainly required on Windows to initialize the network system,
    // for a case when the instance would use UDP. SRT does it on its own, independently.
    if ( !SysInitializeNetwork() )
        throw std::runtime_error("Can't initialize network!");

    // Symmetrically, this does a cleanup; put into a local destructor to ensure that
    // it's called regardless of how this function returns.
    struct NetworkCleanup
    {
        ~NetworkCleanup()
        {
            SysCleanupNetwork();
        }
    } cleanupobj;

    signal(SIGINT, OnINT_SetIntState);
    signal(SIGTERM, OnINT_SetIntState);

    vector<OptionScheme> optargs;

    OptionName
        o_input     ((optargs), "<input-medium> Define input to send over SRT endpoint", "i", "input"),
        o_output    ((optargs), "<output-medium> Define output to send data read from SRT endpoint", "o", "output"),
        o_verbose   ((optargs), "[channel=0|1] Print size of every packet transferred on stdout or specified [channel]", "v",   "verbose"),
        o_loglevel  ((optargs), "<severity=fatal|error|note|warning|debug> Minimum severity for logs", "ll",  "loglevel"),
        o_logfa     ((optargs), "<FA=all> Enabled Functional Areas", "lfa", "logfa"),
        o_help      ((optargs), " This help", "?", "help", "-help")
            ;

    options_t params = ProcessOptions(argv, argc, optargs);

    bool need_help = OptionPresent(params, o_help);

    vector<string> args = params[""];

    string srtspec;

    if (args.empty())
        need_help = true;
    else
    {
        for (size_t i = 0; i < args.size(); ++i)
        {
            UriParser u(args[i], UriParser::EXPECT_HOST);
            if (u.portno() == 0)
            {
                cerr << "ERROR: " << args[i] << " expected host:port or :port syntax.\n";
                return 1;
            }
        }
    }

    if (need_help)
    {
        cerr << "Usage:\n";
        cerr << "    " << argv[0] << " <SRT listeners...> [-i INPUT] [-o OUTPUT]\n";
        cerr << "*** (Position of [options] is unrestricted.)\n";
        cerr << "*** (<variadic...> option parameters can be only terminated by a next option.)\n";
        cerr << "where:\n";
        cerr << "   - <SRT listeners...>: a list of host:port specs for SRT listener\n";
        cerr << "   - INPUT or OUTPUT: at least one of that kind must be specified\n";
        cerr << "SUPPORTED URI SCHEMES:\n";
        cerr << "    srt: use SRT connection\n";
        cerr << "    udp: read from bound UDP socket or send to given address as UDP\n";
        cerr << "    file (default if scheme not specified) specified as:\n";
        cerr << "       - empty host/port and absolute file path in the URI\n";
        cerr << "       - only a filename, also as a relative path\n";
        cerr << "       - file://con ('con' as host): designates stdin or stdout\n";
        cerr << "OPTIONS HELP SYNTAX: -option <parameter[unit]=default[meaning]>:\n";
        for (auto os: optargs)
            cout << OptionHelpItem(*os.pid) << endl;
        return 1;
    }

    bool skip_flushing = false; // non-configurable for now

    bool mode_output = OptionPresent(params, o_output);

    string loglevel = Option<OutString>(params, "error", "ll", "loglevel");
    srt_logging::LogLevel::type lev = SrtParseLogLevel(loglevel);
    srt::setloglevel(lev);
    srt::addlogfa(SRT_LOGFA_APP);

    // Check verbose option before extracting the argument so that Verb()s
    // can be displayed also when they report something about option parsing.
    string verbose_val = Option<OutString>(params, "no", o_verbose);

    int verbch = 1; // default cerr
    if (verbose_val != "no")
    {
        Verbose::on = true;
        try
        {
            verbch = stoi(verbose_val);
        }
        catch (...)
        {
            verbch = 1;
        }
        if (verbch != 1)
        {
            if (verbch != 2)
            {
                cerr << "-v or -v:1 (default) or -v:2 only allowed\n";
                return 1;
            }
            Verbose::cverb = &std::cerr;
        }
        else
        {
            Verbose::cverb = &std::cout;
        }
    }


    if (OptionPresent(params, o_input) == OptionPresent(params, o_output))
    {
        cerr << "One of -i and -o options must be specified (not both)\n";
        return 1;
    }


    // Create listeners according to the parameters
    vector<SRTSOCKET> listeners;

    Verb() << "LISTENERS [ " << VerbNoEOL;

    for (size_t i = 0; i < args.size(); ++i)
    {
        UriParser u(args[i], UriParser::EXPECT_HOST);
        sockaddr_any sa = CreateAddr(u.host(), u.portno());

        SRTSOCKET s = srt_create_socket();

        srt::setopt(s)[SRTO_GROUPCONNECT] = 1;
        srt_bind(s, sa.get(), sizeof sa);
        srt_listen(s, 5);

        listeners.push_back(s);
        Verb() << u.host() << ":" << u.portno() << " " << VerbNoEOL;
    }

    Verb() << "] accept...";

    SRTSOCKET conngrp = srt_accept_bond(listeners.data(), int(listeners.size()), -1);
    if (conngrp == SRT_INVALID_SOCK)
    {
        cerr << "ERROR: srt_accept_bond: " << srt_getlasterror_str() << endl;
        return 1;
    }

    unique_ptr<Source> src;
    unique_ptr<Target> tar;

    try
    {
        // Now create input or output
        if (mode_output)
        {
            string outspec = Option<OutString>(params, o_output);
            Verb() << "SRT -> " << outspec;
            tar = Target::Create(outspec);

            auto s = new SrtSource;
            s->Acquire(conngrp);
            src.reset(s);
        }
        else
        {
            string inspec = Option<OutString>(params, o_input);
            Verb() << "SRT <- " << inspec;
            src = Source::Create(inspec);

            auto s = new SrtTarget;
            s->Acquire(conngrp);
            tar.reset(s);
        }
    }
    catch (...)
    {
        return 2;
    }

    size_t chunk = SRT_LIVE_MAX_PLSIZE;

    // Now run the loop
    try
    {
        for (;;)
        {
            Verb() << " << ... " << VerbNoEOL;
            const MediaPacket& data = src->Read(chunk);
            Verb() << " << " << data.payload.size() << "  ->  " << VerbNoEOL;
            if ( data.payload.empty() && src->End() )
            {
                Verb() << "EOS";
                break;
            }
            tar->Write(data);

            if ( tar->Broken() )
            {
                Verb() << " OUTPUT broken";
                break;
            }

            Verb() << "sent";

            if ( mpbond_int_state )
            {
                Verror() << "\n (interrupted on request)";
                break;
            }
        }
    } catch (Source::ReadEOF&) {
        alarm(0);

        if (!skip_flushing)
        {
            Verror() << "(DEBUG) EOF when reading file. Looping until the sending bufer depletes.\n";
            for (;;)
            {
                size_t still = tar->Still();
                if (still == 0)
                {
                    Verror() << "(DEBUG) DEPLETED. Done.\n";
                    break;
                }

                Verror() << "(DEBUG)... still " << still << " bytes (sleep 1s)\n";
                this_thread::sleep_for(chrono::seconds(1));
            }
        }
    } catch (std::exception& x) { // Catches TransmissionError and AlarmExit
        if (::mpbond_int_state)
        {
            Verror() << "Exit on interrupt.";
            // Do nothing.
        }
        else
        {
            Verror() << "STD EXCEPTION: " << x.what();
        }

        return 255;
    } catch (...) {
        Verror() << "UNKNOWN type of EXCEPTION";
        return 1;
    }

    return 0;
}