File: host_function_scheduler.cpp

package info (click to toggle)
intel-compute-runtime 26.05.37020.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 83,596 kB
  • sloc: cpp: 976,037; lisp: 2,096; sh: 704; makefile: 162
file content (122 lines) | stat: -rw-r--r-- 3,919 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
 * Copyright (C) 2025 Intel Corporation
 *
 * SPDX-License-Identifier: MIT
 *
 */

#include "shared/source/command_stream/host_function_scheduler.h"

#include "shared/source/command_stream/host_function.h"
#include "shared/source/memory_manager/graphics_allocation.h"
#include "shared/source/utilities/wait_util.h"

#include <chrono>
#include <iostream>
#include <type_traits>

namespace NEO {

HostFunctionScheduler::HostFunctionScheduler(bool skipHostFunctionExecution,
                                             int32_t threadsInThreadPoolLimit)
    : HostFunctionWorker(skipHostFunctionExecution),
      threadPool(threadsInThreadPoolLimit) {
}

HostFunctionScheduler::~HostFunctionScheduler() = default;

void HostFunctionScheduler::start(HostFunctionStreamer *streamer) {

    this->registerHostFunctionStreamer(streamer);
    this->threadPool.registerThread();

    if (worker == nullptr) {
        std::unique_lock<std::mutex> lock(workerMutex);
        if (worker == nullptr) {
            worker = std::make_unique<std::jthread>([this](std::stop_token st) {
                this->schedulerLoop(st);
            });
        }
    }
}

void HostFunctionScheduler::finish() {

    std::call_once(shutdownOnceFlag, [&]() {
        threadPool.shutdown();
        {
            std::unique_lock<std::mutex> lock(workerMutex);
            if (worker) {
                worker->request_stop();
                semaphore.release();
                worker->join();
                worker.reset(nullptr);
            }
        }

        {
            std::unique_lock<std::mutex> lock(registeredStreamersMutex);
            registeredStreamers.clear();
        }
    });
}

void HostFunctionScheduler::submit(uint32_t nHostFunctions) noexcept {
    semaphore.release(static_cast<ptrdiff_t>(nHostFunctions));
}

void HostFunctionScheduler::scheduleHostFunctionToThreadPool(HostFunctionStreamer *streamer, uint64_t id) noexcept {

    auto hostFunction = streamer->getHostFunction(id);
    streamer->prepareForExecution(hostFunction);
    threadPool.registerHostFunctionToExecute(streamer, std::move(hostFunction));
}

void HostFunctionScheduler::schedulerLoop(std::stop_token st) noexcept {

    std::unique_lock<std::mutex> registeredStreamersLock(registeredStreamersMutex, std::defer_lock);
    auto waitStart = std::chrono::steady_clock::now();

    while (st.stop_requested() == false) {
        semaphore.acquire(); // wait until there is at least one pending host function
        semaphore.release(); // leave count unchanged intentionally

        if (st.stop_requested()) {
            return;
        }

        registeredStreamersLock.lock();
        for (auto streamer : registeredStreamers) {
            if (auto id = isHostFunctionReadyToExecute(streamer); id != HostFunctionStatus::completed) {
                //  std::cout << "id : " << id << std::endl;
                scheduleHostFunctionToThreadPool(streamer, id);
                waitStart = std::chrono::steady_clock::now();
            }
        }
        registeredStreamersLock.unlock();

        if (st.stop_requested()) {
            return;
        }

        auto waitTime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - waitStart);
        WaitUtils::waitFunctionWithoutPredicate(waitTime.count());
    }
}

void HostFunctionScheduler::registerHostFunctionStreamer(HostFunctionStreamer *streamer) {
    std::lock_guard<std::mutex> lock(registeredStreamersMutex);
    registeredStreamers.push_back(streamer);
}

uint64_t HostFunctionScheduler::isHostFunctionReadyToExecute(HostFunctionStreamer *streamer) {
    auto id = streamer->getHostFunctionReadyToExecute();
    if (id != HostFunctionStatus::completed && semaphore.try_acquire()) {
        return id;
    }
    return HostFunctionStatus::completed;
}

static_assert(NonCopyableAndNonMovable<HostFunctionScheduler>);

} // namespace NEO