File: PluginBridge.cpp

package info (click to toggle)
pd-vstplugin 0.6.1-1~exp1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 2,008 kB
  • sloc: cpp: 22,783; lisp: 2,860; makefile: 37; sh: 26
file content (564 lines) | stat: -rw-r--r-- 18,858 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
#include "PluginBridge.h"

#include "PluginCommand.h"
#include "Log.h"
#include "CpuArch.h"
#include "MiscUtils.h"

#include <cassert>

namespace vst {

/*///////////////////// Channel /////////////////////*/

template<>
void NRTChannel::checkError(){
    const ShmCommand *reply;
    if (getReply(reply)){
        if (reply->type == Command::Error){
            reply->throwError();
        }
    }
}

/*//////////////////// PluginBridge /////////////////*/

static std::mutex gPluginBridgeMutex;

// use std::weak_ptr, so the bridge is automatically closed if it is not used
static std::unordered_map<CpuArch, std::weak_ptr<PluginBridge>> gPluginBridgeMap;

PluginBridge::ptr PluginBridge::getShared(CpuArch arch){
    PluginBridge::ptr bridge;

    std::lock_guard lock(gPluginBridgeMutex);

    auto it = gPluginBridgeMap.find(arch);
    if (it != gPluginBridgeMap.end()){
        bridge = it->second.lock();
    }

    // Edge case: the bridge subprocess has crashed, but still lingers,
    // so we create a new one; otherwise, the user would get a misleading
    // error message that the new plugin has crashed.
    if (!bridge || !bridge->alive()){
        // create shared bridge
        LOG_DEBUG("create shared plugin bridge for " << cpuArchToString(arch));
        bridge = std::make_shared<PluginBridge>(arch, true);
        gPluginBridgeMap[arch] = bridge; // insert/assign

        WatchDog::instance().registerProcess(bridge);
    }

    return bridge;
}

PluginBridge::ptr PluginBridge::create(CpuArch arch){
    auto bridge = std::make_shared<PluginBridge>(arch, false);

    WatchDog::instance().registerProcess(bridge);

    return bridge;
}

int getNumDSPThreads();

PluginBridge::PluginBridge(CpuArch arch, bool shared)
    : shared_(shared)
{
    LOG_DEBUG("PluginBridge: created shared memory interface");
    // setup shared memory interface
    // UI channels:
    shm_.addChannel(ShmChannel::Queue, queueSize, "ui_snd");
    shm_.addChannel(ShmChannel::Queue, queueSize, "ui_rcv");
    if (shared){
        // --- shared plugin bridge ---
        // A single NRT channel followed by several RT channels.
        //
        // The bridge can be used from several threads concurrently!
        // This is necessary for hosts with multi-threaded audio processing
        // (like Supernova), some libpd apps - and maybe even Pd itself :-)
        // For the actual channel allocation algorithm, see getRTChannel().
        //
        // NB: getNumDSPThreads() defaults to the number of logical CPUs,
        // unless explicitly overriden by the user (which implies that they
        // really want to use *our* multithreading implemention).
        numThreads_ = getNumDSPThreads();
        LOG_DEBUG("PluginBridge: using " << numThreads_ << " RT threads");
        shm_.addChannel(ShmChannel::Request, nrtRequestSize, "nrt");
        for (int i = 0; i < numThreads_; ++i){
            char buf[16];
            snprintf(buf, sizeof(buf), "rt%d", i+1);
            shm_.addChannel(ShmChannel::Request, rtRequestSize, buf);
        }

        locks_ = std::make_unique<PaddedSpinLock[]>(numThreads_);
    } else {
        // --- sandboxed plugin ---
        // a single rt channel which also doubles as the nrt channel
        shm_.addChannel(ShmChannel::Request, rtRequestSize, "rt");
    }
    shm_.create();

    LOG_DEBUG("PluginBridge: created channels");

#ifdef _WIN32
    // create pipe for logging
    if (!CreatePipe(&hLogRead_, &hLogWrite_, NULL, 0)) {
        throw Error(Error::SystemError,
                    "CreatePipe() failed: " + errorMessage(GetLastError()));
    }
    intptr_t pipeHandle = reinterpret_cast<intptr_t>(hLogWrite_);
#else // Unix
    // create pipe for logging
    int pipefd[2];
    if (pipe(pipefd) != 0){
        throw Error(Error::SystemError,
                    "pipe() failed: " + errorMessage(errno));
    }
    logRead_ = pipefd[0];
    intptr_t pipeHandle = pipefd[1];
#endif
    // spawn host process
    // NB: we already checked in the PluginFactory::PluginFactory if we're able to bridge
    auto hostApp = IHostApp::get(arch);
    assert(hostApp);
    try {
        process_ = hostApp->bridge(shm_.path(), pipeHandle);
    } catch (const Error& e) {
        // close pipe handles
    #ifdef _WIN32
        CloseHandle(hLogRead_);
        CloseHandle(hLogWrite_);
    #else
        close(pipefd[0]);
        close(pipefd[1]);
    #endif
        auto msg = "couldn't create host process '" + hostApp->path() + "': " + e.what();
        throw Error(Error::SystemError, msg);
    }
#ifdef _WIN32
    // We can't simply close our end after CreateProcess() because the child process
    // needs to duplicate the handle; otherwise we would inadvertently close the pipe.
    // We only close our end in the destructor, after reading all remaining messages.
#else
    // close write end *after* creating the subprocess!
    close(pipefd[1]);
#endif

    alive_ = true;
    LOG_DEBUG("PluginBridge: spawned subprocess (child: " << process_.pid()
              << ", parent: " << getCurrentProcessId() << ")");

    pollFunction_ = UIThread::addPollFunction([](void *x){
        static_cast<PluginBridge *>(x)->pollUIThread();
    }, this);
    LOG_DEBUG("PluginBridge: added poll function");
}

PluginBridge::~PluginBridge(){
    LOG_DEBUG("PluginBridge: remove poll function");
    UIThread::removePollFunction(pollFunction_);

    // send quit message
    if (alive()){
        LOG_DEBUG("PluginBridge: send quit message");
        ShmCommand cmd(Command::Quit);

        auto chn = getNRTChannel();
        chn.AddCommand(cmd, empty);
        chn.send();
    }

    // wait for the subprocess to finish.
    // this might even be dangerous if the subprocess
    // somehow got stuck. maybe use some timeout?
    if (process_) {
        LOG_DEBUG("PluginBridge: wait for process");
        try {
            process_.wait();
        } catch (const Error& e) {
            LOG_ERROR("PluginBridge::~PluginBridge: " << e.what());
        }
    }

    // read remaining messages
    readLog(false);

#ifdef _WIN32
    if (hLogRead_) {
        CloseHandle(hLogRead_);
    }
    if (hLogWrite_) {
        CloseHandle(hLogWrite_);
    }
#else
    if (logRead_ >= 0) {
        close(logRead_);
    }
#endif

    LOG_DEBUG("free PluginBridge");
}

void PluginBridge::readLog(bool loud){
#ifdef _WIN32
    if (hLogRead_) {
        for (;;) {
            // try to read header into buffer, but don't remove it from
            // the pipe yet. we also get the number of available bytes.
            // NOTE: PeekNamedPipe() returns immediately!
            LogMessage::Header header;
            DWORD bytesRead, bytesAvailable;
            if (!PeekNamedPipe(hLogRead_, &header, sizeof(header),
                               &bytesRead, &bytesAvailable, NULL)) {
                if (loud) {
                    LOG_ERROR("PeekNamedPipe(): " << errorMessage(GetLastError()));
                }
                CloseHandle(hLogRead_);
                hLogRead_ = NULL;
                return;
            }
            if (bytesRead < sizeof(header)){
                // nothing to read yet
                return;
            }
            // check if message is complete
            int msgsize = header.size + sizeof(header);
            if (bytesAvailable < msgsize) {
                // try again next time
                return;
            }
            // now actually read the whole message
            auto msg = (LogMessage *)alloca(msgsize);
            if (ReadFile(hLogRead_, msg, msgsize, &bytesRead, NULL)){
                if (bytesRead == msgsize){
                    logMessage(msg->header.level, msg->data);
                } else {
                    // shouldn't really happen because we've peeked
                    // the number of available bytes!
                    LOG_ERROR("ReadFile(): size mismatch");
                    CloseHandle(hLogRead_);
                    hLogRead_ = NULL;
                    return;
                }
            } else {
                if (loud) {
                    LOG_ERROR("ReadFile(): " << errorMessage(GetLastError()));
                }
                CloseHandle(hLogRead_);
                hLogRead_ = NULL;
                return;
            }
        }
    }
#else
    if (logRead_ >= 0){
        for (;;){
            auto checkResult = [this, loud](int count){
                if (count > 0){
                    return true;
                }
                if (loud) {
                    if (count == 0){
                        LOG_WARNING("read(): EOF");
                    } else {
                        LOG_ERROR("read(): " << errorMessage(errno));
                    }
                }
                close(logRead_);
                logRead_ = -1;
                return false;
            };

            struct pollfd fds;
            fds.fd = logRead_;
            fds.events = POLLIN;
            fds.revents = 0;

            auto ret = poll(&fds, 1, 0);
            if (ret > 0){
                if (fds.revents & POLLIN){
                    LogMessage::Header header;
                    auto count = read(logRead_, &header, sizeof(header));
                    if (!checkResult(count)){
                        return;
                    }
                    // always atomic (header is smaller than PIPE_BUF).
                    assert(count == sizeof(header));

                    auto msgsize = header.size;
                    auto msg = (char *)alloca(msgsize);
                    // The following calls to read() can block!
                    // This could be dangerous if the subprocess dies after
                    // writing the header but before writing the actual message.
                    // However, in this case all file descriptors to the write end
                    // should have been closed and read() should return 0 (= EOF).
                    //
                    // We use a loop in case the message is larger than PIPE_BUF.
                    int bytes = 0;
                    while (bytes < msgsize) {
                        count = read(logRead_, msg + bytes, msgsize - bytes);
                        if (!checkResult(count)){
                            return;
                        }
                        bytes += count;
                    }
                    logMessage(header.level, msg);
                } else {
                    // pipe closed
                    if (loud) {
                        if (fds.revents & POLLHUP){
                            // there might be remaining data in the pipe, but we don't care.
                            LOG_ERROR("FIFO closed");
                        } else {
                            // shouldn't happen when reading from a pipe
                            LOG_ERROR("FIFO error");
                        }
                    }
                    close(logRead_);
                    logRead_ = -1;
                    break;
                }
            } else if (ret == 0){
                // timeout
                break;
            } else {
                // poll() failed
                LOG_ERROR("poll(): " << errorMessage(errno));
                close(logRead_);
                logRead_ = -1;
                break;
            }
        }
    }
#endif
}


void PluginBridge::checkStatus(){
    // already dead, no need to check
    if (!alive_.load(std::memory_order_acquire)){
        return;
    }
    if (!process_.checkIfRunning()) {
        // make sure that only a single thread will notify clients
        if (alive_.exchange(false)) {
            // notify waiting NRT/RT threads
            for (int i = Channel::NRT; i < shm_.numChannels(); ++i){
                // this should be safe, because channel messages
                // can only be read when they are complete
                // (the channel size is atomic)
                shm_.getChannel(i).postReply();
            }
            LOG_DEBUG("PluginBridge: notify clients");
            // notify all clients
            // NB: clients shall not close the plugin from within
            // the callback function, so this won't deadlock!
            std::lock_guard lock(clientMutex_);
            for (auto& [_, client] : clients_) {
                client->pluginCrashed();
            }
        }
    }
}

void PluginBridge::addUIClient(uint32_t id, IPluginListener* client){
    LOG_DEBUG("PluginBridge: add client " << id);
    std::lock_guard lock(clientMutex_);
    clients_.emplace(id, client);
}

void PluginBridge::removeUIClient(uint32_t id){
    LOG_DEBUG("PluginBridge: remove client " << id);
    std::lock_guard lock(clientMutex_);
    clients_.erase(id);
}

void PluginBridge::postUIThread(const ShmUICommand& cmd) {
    // sizeof(cmd) is a bit lazy, but we don't care too much about space here
    auto& channel = shm_.getChannel(Channel::UISend);
    if (channel.writeMessage(&cmd, sizeof(cmd))){
        // other side polls regularly
        // channel.post();
    } else {
        // TODO: loop + sleep for 1 second, see PluginServer
        LOG_ERROR("PluginBridge: couldn't post to UI thread");
    }
}

void PluginBridge::pollUIThread(){
    if (!alive()) {
        return;
    }

    auto& channel = shm_.getChannel(Channel::UIReceive);
    char buffer[64]; // larger than ShmCommand!
    size_t size = sizeof(buffer);
    // read all available events
    while (channel.readMessage(buffer, size)){
        auto cmd = (const ShmUICommand *)buffer;
        // find client with matching ID
        std::lock_guard lock(clientMutex_);

        auto client = findClient(cmd->id);
        if (client){
            // dispatch events
            switch (cmd->type){
            case Command::ParamAutomated:
                LOG_DEBUG("UI thread: ParameterAutomated");
                client->parameterAutomated(cmd->paramAutomated.index,
                                           cmd->paramAutomated.value);
                break;
            case Command::LatencyChanged:
                LOG_DEBUG("UI thread: LatencyChanged");
                client->latencyChanged(cmd->latency);
                break;
            case Command::UpdateDisplay:
                LOG_DEBUG("UI thread: UpdateDisplay");
                client->updateDisplay();
                break;
            default:
                // ignore other events for now
                break;
            }
        }

        size = sizeof(buffer); // reset size!
    }
}

// must be called with clientMutex_ locked!
IPluginListener* PluginBridge::findClient(uint32_t id){
    auto it = clients_.find(id);
    if (it != clients_.end()){
        return it->second;
    } else {
        LOG_ERROR("PluginBridge::pollUIThread: plugin "
                  << id << " doesn't exist (anymore)");
        return nullptr;
    }
}

RTChannel PluginBridge::getRTChannel(){
    if (locks_){
        // shared plugin bridge, see the comments in PluginBridge::PluginBridge().

        // we map audio threads to successive indices, so that each audio thread
        // is automatically associated with a dedicated thread in the subprocess.
        static std::atomic<uint32_t> counter{0};

        thread_local uint32_t threadIndex = counter.fetch_add(1) % numThreads_;
        uint32_t index = threadIndex;
        if (!locks_[index].try_lock()){
            // if two threads end up on the same spinlock, e.g. because there are
            // more audio threads than in the subprocess, try to find a free spinlock.
            // LOG_DEBUG("PluginBridge: index " << index << " taken");
            do {
                if (++index == numThreads_) {
                    index = 0;
                }
            #if 0
                // pause CPU everytime we cycle through all spinlocks
                if (index == threadIndex) {
                    int spinCount = 1000;
                    while (spinCount--) {
                        pauseCpu();
                    }
                }
            #endif
            } while (!locks_[index].try_lock());
            // LOG_DEBUG("PluginBridge: found free index " << index);
        }
        return RTChannel(shm_.getChannel(Channel::NRT + 1 + index),
                         std::unique_lock(locks_[index], std::adopt_lock));
    } else {
        // plugin sandbox: RT channel = NRT channel
        return RTChannel(shm_.getChannel(Channel::NRT));
    }
}

NRTChannel PluginBridge::getNRTChannel(){
    if (locks_){
        return NRTChannel(shm_.getChannel(Channel::NRT),
                          std::unique_lock(nrtMutex_));
    } else {
        // channel 2 is both NRT and RT channel
        return NRTChannel(shm_.getChannel(Channel::NRT));
    }
}

/*/////////////////// WatchDog //////////////////////*/

// poll interval in milliseconds
#define WATCHDOG_POLL_INTERVAL 5

WatchDog& WatchDog::instance(){
    static WatchDog watchDog;
    return watchDog;
}

WatchDog::WatchDog(){
    LOG_DEBUG("start WatchDog");
    running_ = true;
    thread_ = std::thread(&WatchDog::run, this);
}

WatchDog::~WatchDog(){
#ifdef _WIN32
    // You can't synchronize threads in a global/static object
    // destructor in a Windows DLL because of the loader lock.
    // See https://docs.microsoft.com/en-us/windows/win32/dlls/dynamic-link-library-best-practices
    thread_.detach();
#else
    {
        std::lock_guard lock(mutex_);
        processes_.clear(); // !
        running_ = false;
        condition_.notify_one();
    }
    thread_.join();
#endif
    LOG_DEBUG("free WatchDog");
}

void WatchDog::registerProcess(PluginBridge::ptr process){
    LOG_DEBUG("WatchDog: register process");
    std::lock_guard lock(mutex_);
    processes_.push_back(process);
    condition_.notify_one();
}

void WatchDog::run(){
    vst::setThreadPriority(Priority::Low);

    std::unique_lock lock(mutex_);
    while (running_) {
        LOG_DEBUG("WatchDog: waiting...");
        // wait until a process has been added or we should quit
        condition_.wait(lock, [&]() { return !processes_.empty() || !running_; });
        LOG_DEBUG("WatchDog: woke up");

        // periodically check all running processes
        while (!processes_.empty()){
            for (auto it = processes_.begin(); it != processes_.end();){
                auto process = it->lock();
                if (process){
                    process->readLog();
                    process->checkStatus();
                    ++it;
                } else {
                    // remove stale process
                    it = processes_.erase(it);
                }
            }

            lock.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(WATCHDOG_POLL_INTERVAL));
            lock.lock();
        }
    }
    LOG_DEBUG("WatchDog: thread finished");
}

} // vst