File: thread_mailbox.c

package info (click to toggle)
emscripten 3.1.69%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 121,872 kB
  • sloc: ansic: 636,110; cpp: 425,974; javascript: 78,401; python: 58,404; sh: 49,154; pascal: 5,237; makefile: 3,365; asm: 2,415; lisp: 1,869
file content (114 lines) | stat: -rw-r--r-- 4,119 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*
 * Copyright 2023 The Emscripten Authors.  All rights reserved.
 * Emscripten is available under two separate licenses, the MIT license and the
 * University of Illinois/NCSA Open Source License.  Both these licenses can be
 * found in the LICENSE file.
 */

#include <assert.h>
#include <math.h>
#include <pthread.h>
#include <stdatomic.h>

#include "em_task_queue.h"
#include "pthread_impl.h"
#include "thread_mailbox.h"
#include "threading_internal.h"

int emscripten_thread_mailbox_ref(pthread_t thread) {
  // Attempt to increment the refcount, being careful not to increment it if we
  // ever observe a 0.
  int prev_count = thread->mailbox_refcount;
  while (1) {
    if (prev_count == 0) {
      // The mailbox is already closed!
      return 0;
    }
    int desired_count = prev_count + 1;
    if (atomic_compare_exchange_weak(
          &thread->mailbox_refcount, &prev_count, desired_count)) {
      return 1;
    }
  }
}

// Decrement and return the refcount.
void emscripten_thread_mailbox_unref(pthread_t thread) {
  int new_count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;
  assert(new_count >= 0);
  if (new_count == 0) {
    // The count is now zero. The thread that owns this queue may be waiting to
    // shut down. Notify the thread that it is safe to proceed now that the
    // mailbox is closed.
    emscripten_futex_wake(&thread->mailbox_refcount, INT_MAX);
  }
}

void _emscripten_thread_mailbox_shutdown(pthread_t thread) {
  assert(thread == pthread_self());

  // Decrement the refcount and wait for it to reach zero.
  assert(thread->mailbox_refcount > 0);
  int count = atomic_fetch_sub(&thread->mailbox_refcount, 1) - 1;

  while (count != 0) {
    emscripten_futex_wait(&thread->mailbox_refcount, count, INFINITY);
    count = thread->mailbox_refcount;
  }

  // The mailbox is now closed. No more messages will be enqueued. Run the
  // shutdown handler for any message already in the queue.
  em_task_queue_cancel(thread->mailbox);

  // The mailbox is now empty and will not be accessed again after this point.
  em_task_queue_destroy(thread->mailbox);
}

void _emscripten_thread_mailbox_init(pthread_t thread) {
  thread->mailbox = em_task_queue_create(thread);
  thread->mailbox_refcount = 1;
  thread->waiting_async = 0;
}

// Internal function, called from runtime_pthread.js
void _emscripten_check_mailbox() {
  // Before we attempt to execute a request from another thread make sure we
  // are in sync with all the loaded code.
  // For example, in PROXY_TO_PTHREAD the atexit functions are called via
  // a proxied call, and without this call to syncronize we would crash if
  // any atexit functions were registered from a side module.
  assert(pthread_self());
  em_task_queue* mailbox = pthread_self()->mailbox;
  mailbox->notification = NOTIFICATION_RECEIVED;
  em_task_queue_execute(pthread_self()->mailbox);
  notification_state expected = NOTIFICATION_RECEIVED;
  atomic_compare_exchange_strong(
    &mailbox->notification, &expected, NOTIFICATION_NONE);
  // After every mailbox check we call `__pthread_testcancel` in case
  // one of the proxied functions was from pthread_kill(SIGCANCEL).
  __pthread_testcancel();
}

void emscripten_thread_mailbox_send(pthread_t thread, task t) {
  assert(thread->mailbox_refcount > 0);

  pthread_mutex_lock(&thread->mailbox->mutex);
  if (!em_task_queue_enqueue(thread->mailbox, t)) {
    assert(0 && "No way to correctly recover from allocation failure");
  }
  pthread_mutex_unlock(&thread->mailbox->mutex);

  // If there is no pending notification for this mailbox, create one. If an old
  // notification is currently being processed, it may or may not execute the
  // new work. In case it does not, the new notification will ensure the work is
  // still executed.
  notification_state previous =
    atomic_exchange(&thread->mailbox->notification, NOTIFICATION_PENDING);
  if (previous != NOTIFICATION_PENDING) {
    if (thread->waiting_async) {
      __builtin_wasm_memory_atomic_notify((int*)thread, -1);
    } else {
      _emscripten_notify_mailbox_postmessage(thread, pthread_self());
    }
  }
}