File: thread_utils.h

package info (click to toggle)
emscripten 3.1.69%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 121,860 kB
  • sloc: ansic: 636,110; cpp: 425,974; javascript: 78,401; python: 58,404; sh: 49,154; pascal: 5,237; makefile: 3,366; asm: 2,415; lisp: 1,869
file content (101 lines) | stat: -rw-r--r-- 3,608 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
 * Copyright 2021 The Emscripten Authors.  All rights reserved.
 * Emscripten is available under two separate licenses, the MIT license and the
 * University of Illinois/NCSA Open Source License.  Both these licenses can be
 * found in the LICENSE file.
 */

#pragma once

#include <functional>
#include <thread>

#include <emscripten/proxying.h>
#include <emscripten/threading.h>

extern "C" {
void _wasmfs_thread_utils_heartbeat(em_proxying_queue* ctx);
}

namespace emscripten {

// Helper class for synchronously proxying work to a dedicated worker thread,
// including where the work is asynchronous.
class ProxyWorker {
  // The queue we use to proxy work and the dedicated worker.
  ProxyingQueue queue;
  std::thread thread;

  // Used to notify the calling thread once the worker has been started.
  bool started = false;
  std::mutex mutex;
  std::condition_variable cond;

public:
  // Spawn the worker thread.
  ProxyWorker()
    : queue(), thread([&]() {
        // Notify the caller that we have started.
        {
          std::unique_lock<std::mutex> lock(mutex);
          started = true;
        }
        cond.notify_all();

        // Sometimes the main thread is spinning, waiting on a WasmFS lock held
        // by a thread trying to proxy work to this dedicated worker. In that
        // case, the proxying message won't be relayed by the main thread and
        // the system will deadlock. This heartbeat ensures that proxying work
        // eventually gets done so the thread holding the lock can make forward
        // progress even if the main thread is blocked.
        //
        // TODO: Remove this once we can postMessage directly between workers
        // without involving the main thread or once all browsers ship
        // Atomics.waitAsync.
        //
        // Note that this requires adding _emscripten_proxy_execute_queue to
        // EXPORTED_FUNCTIONS.
        _wasmfs_thread_utils_heartbeat(queue.queue);

        // Sit in the event loop performing work as it comes in.
        emscripten_exit_with_live_runtime();
      }) {

    // Make sure the thread has actually started before returning. This allows
    // subsequent code to assume the thread has already been spawned and not
    // worry about potential deadlocks where it holds a lock while proxying an
    // operation and meanwhile the main thread is blocked trying to acqure the
    // same lock so is never able to spawn the worker thread.
    //
    // Unfortunately, this solution would cause the main thread to deadlock on
    // itself, so for now assert that we are not on the main thread. In the
    // future, we could provide an asynchronous version of this utility that
    // calls a user callback once the worker has been started. This asynchronous
    // version would be safe to use on the main thread.
    assert(
      !emscripten_is_main_browser_thread() &&
      "cannot safely spawn dedicated workers from the main browser thread");
    {
      std::unique_lock<std::mutex> lock(mutex);
      cond.wait(lock, [&]() { return started; });
    }
  }

  // Kill the worker thread.
  ~ProxyWorker() {
    pthread_cancel(thread.native_handle());
    thread.join();
  }

  // Proxy synchronous work.
  void operator()(const std::function<void()>& func) {
    queue.proxySync(thread.native_handle(), func);
  }
  // Proxy asynchronous work that calls `finish()` on the ctx parameter to mark
  // its end.
  void operator()(const std::function<void(ProxyingQueue::ProxyingCtx)>& func) {
    queue.proxySyncWithCtx(thread.native_handle(), func);
  }
};

} // namespace emscripten