1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
|
#include <condition_variable>
#include <cstdlib>
#include <mutex>
#include <thread>
#include "napi.h"
#if (NAPI_VERSION > 3)
using namespace Napi;
namespace {
struct TestData {
TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)){};
// Native Promise returned to JavaScript
Promise::Deferred deferred;
// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};
ThreadSafeFunction tsfn = ThreadSafeFunction();
// These variables are only accessed from the main thread.
bool mainWantsRelease = false;
size_t expected_calls = 0;
};
void FinalizerCallback(Napi::Env env, TestData* finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
/**
* See threadsafe_function_sum.js for descriptions of the tests in this file
*/
void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) {
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}
static Value TestWithTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();
// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData* testData = new TestData(Promise::Deferred::New(info.Env()));
ThreadSafeFunction tsfn = ThreadSafeFunction::New(
info.Env(),
cb,
"Test",
0,
threadCount,
std::function<decltype(FinalizerCallback)>(FinalizerCallback),
testData);
for (int i = 0; i < threadCount; ++i) {
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back(std::thread(entryWithTSFN, tsfn, i));
}
return testData->deferred.Promise();
}
// Task instance created for each new std::thread
class DelayedTSFNTask {
public:
// Each instance has its own tsfn
ThreadSafeFunction tsfn;
// Thread-safety
std::mutex mtx;
std::condition_variable cv;
// Entry point for std::thread
void entryDelayedTSFN(int threadId) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [this] { return this->tsfn != nullptr; });
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
};
};
struct TestDataDelayed {
TestDataDelayed(Promise::Deferred&& deferred)
: deferred(std::move(deferred)){};
~TestDataDelayed() { taskInsts.clear(); };
// Native Promise returned to JavaScript
Promise::Deferred deferred;
// List of threads created for test. This list only ever accessed via main
// thread.
std::vector<std::thread> threads = {};
// List of DelayedTSFNThread instances
std::vector<std::unique_ptr<DelayedTSFNTask>> taskInsts = {};
ThreadSafeFunction tsfn = ThreadSafeFunction();
};
void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed* finalizeData) {
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
static Value TestDelayedTSFN(const CallbackInfo& info) {
int threadCount = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();
TestDataDelayed* testData =
new TestDataDelayed(Promise::Deferred::New(info.Env()));
testData->tsfn =
ThreadSafeFunction::New(info.Env(),
cb,
"Test",
0,
threadCount,
std::function<decltype(FinalizerCallbackDelayed)>(
FinalizerCallbackDelayed),
testData);
for (int i = 0; i < threadCount; ++i) {
testData->taskInsts.push_back(
std::unique_ptr<DelayedTSFNTask>(new DelayedTSFNTask()));
testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN,
testData->taskInsts.back().get(),
i));
}
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
for (auto& task : testData->taskInsts) {
std::lock_guard<std::mutex> lk(task->mtx);
task->tsfn = testData->tsfn;
task->cv.notify_all();
}
return testData->deferred.Promise();
}
void AcquireFinalizerCallback(Napi::Env env,
TestData* finalizeData,
TestData* context) {
(void)context;
for (size_t i = 0; i < finalizeData->threads.size(); ++i) {
finalizeData->threads[i].join();
}
finalizeData->deferred.Resolve(Boolean::New(env, true));
delete finalizeData;
}
void entryAcquire(ThreadSafeFunction tsfn, int threadId) {
tsfn.Acquire();
TestData* testData = tsfn.GetContext();
std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1));
tsfn.BlockingCall([=](Napi::Env env, Function callback) {
// This lambda runs on the main thread so it's OK to access the variables
// `expected_calls` and `mainWantsRelease`.
testData->expected_calls--;
if (testData->expected_calls == 0 && testData->mainWantsRelease)
testData->tsfn.Release();
callback.Call({Number::New(env, static_cast<double>(threadId))});
});
tsfn.Release();
}
static Value CreateThread(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
// Counting expected calls like this only works because on the JS side this
// binding is called from a synchronous loop. This means the main loop has no
// chance to run the tsfn JS callback before we've counted how many threads
// the JS intends to create.
testData->expected_calls++;
ThreadSafeFunction tsfn = testData->tsfn;
int threadId = testData->threads.size();
// A copy of the ThreadSafeFunction will go to the thread entry point
testData->threads.push_back(std::thread(entryAcquire, tsfn, threadId));
return Number::New(info.Env(), threadId);
}
static Value StopThreads(const CallbackInfo& info) {
TestData* testData = static_cast<TestData*>(info.Data());
testData->mainWantsRelease = true;
return info.Env().Undefined();
}
static Value TestAcquire(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Napi::Env env = info.Env();
// We pass the test data to the Finalizer for cleanup. The finalizer is
// responsible for deleting this data as well.
TestData* testData = new TestData(Promise::Deferred::New(info.Env()));
testData->tsfn =
ThreadSafeFunction::New(env,
cb,
"Test",
0,
1,
testData,
std::function<decltype(AcquireFinalizerCallback)>(
AcquireFinalizerCallback),
testData);
Object result = Object::New(env);
result["createThread"] =
Function::New(env, CreateThread, "createThread", testData);
result["stopThreads"] =
Function::New(env, StopThreads, "stopThreads", testData);
result["promise"] = testData->deferred.Promise();
return result;
}
} // namespace
Object InitThreadSafeFunctionSum(Env env) {
Object exports = Object::New(env);
exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN);
exports["testWithTSFN"] = Function::New(env, TestWithTSFN);
exports["testAcquire"] = Function::New(env, TestAcquire);
return exports;
}
#endif
|