1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
|
#include "server-task.h"
#include "server-queue.h"
#include "log.h"
#include <chrono>
#define QUE_INF(fmt, ...) LOG_INF("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_WRN(fmt, ...) LOG_WRN("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_ERR(fmt, ...) LOG_ERR("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define QUE_DBG(fmt, ...) LOG_DBG("que %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_INF(fmt, ...) LOG_INF("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_WRN(fmt, ...) LOG_WRN("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_ERR(fmt, ...) LOG_ERR("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
#define RES_DBG(fmt, ...) LOG_DBG("res %12.*s: " fmt, 12, __func__, __VA_ARGS__)
//
// server_queue
//
int server_queue::post(server_task && task, bool front) {
std::unique_lock<std::mutex> lock(mutex_tasks);
GGML_ASSERT(task.id != -1);
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
const int task_id = task.id;
QUE_DBG("new task, id = %d, front = %d\n", task_id, front);
if (front) {
queue_tasks.push_front(std::move(task));
} else {
queue_tasks.push_back(std::move(task));
}
time_last_task = ggml_time_ms();
condition_tasks.notify_one();
return task_id;
}
int server_queue::post(std::vector<server_task> && tasks, bool front) {
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : tasks) {
if (task.id == -1) {
task.id = id++;
}
// if this is cancel task make sure to clean up pending tasks
if (task.type == SERVER_TASK_TYPE_CANCEL) {
cleanup_pending_task(task.id_target);
}
QUE_DBG("new task, id = %d/%d, front = %d\n", task.id, (int) tasks.size(), front);
if (front) {
queue_tasks.push_front(std::move(task));
} else {
queue_tasks.push_back(std::move(task));
}
}
time_last_task = ggml_time_ms();
condition_tasks.notify_one();
return 0;
}
void server_queue::defer(server_task && task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
QUE_DBG("defer task, id = %d\n", task.id);
queue_tasks_deferred.push_back(std::move(task));
time_last_task = ggml_time_ms();
condition_tasks.notify_one();
}
int server_queue::get_new_id() {
std::unique_lock<std::mutex> lock(mutex_tasks);
int new_id = id++;
return new_id;
}
void server_queue::pop_deferred_task() {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!queue_tasks_deferred.empty()) {
queue_tasks.emplace_front(std::move(queue_tasks_deferred.front()));
queue_tasks_deferred.pop_front();
}
time_last_task = ggml_time_ms();
condition_tasks.notify_one();
}
void server_queue::wait_until_no_sleep() {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!sleeping) {
return;
} else {
if (!req_stop_sleeping) {
QUE_DBG("%s", "requesting to stop sleeping\n");
req_stop_sleeping = true;
condition_tasks.notify_one(); // only main thread is waiting on this
}
QUE_DBG("%s", "waiting until no sleep\n");
condition_tasks.wait(lock, [&]{
return !sleeping;
});
}
}
void server_queue::terminate() {
std::unique_lock<std::mutex> lock(mutex_tasks);
running = false;
condition_tasks.notify_all();
}
void server_queue::start_loop(int64_t idle_sleep_ms) {
running = true;
time_last_task = ggml_time_ms();
constexpr auto max_wait_time = std::chrono::seconds(1);
auto should_sleep = [&]() -> bool {
// caller must hold mutex_tasks
if (idle_sleep_ms < 0) {
return false;
}
int64_t now = ggml_time_ms();
return (now - time_last_task) >= idle_sleep_ms;
};
while (true) {
QUE_DBG("%s", "processing new tasks\n");
while (true) {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!running) {
QUE_DBG("%s", "terminate\n");
return;
}
if (queue_tasks.empty()) {
lock.unlock();
break;
}
server_task task = std::move(queue_tasks.front());
queue_tasks.pop_front();
lock.unlock();
QUE_DBG("processing task, id = %d\n", task.id);
callback_new_task(std::move(task));
}
// all tasks in the current loop is processed, slots data is now ready
QUE_DBG("%s", "update slots\n");
// this will run the main inference process for all slots
callback_update_slots();
{
// update_slots() may take a while to finish, we need to make sure it's not counted as idle
std::unique_lock<std::mutex> lock(mutex_tasks);
time_last_task = ggml_time_ms();
}
QUE_DBG("%s", "waiting for new tasks\n");
while (true) {
std::unique_lock<std::mutex> lock(mutex_tasks);
if (!running || !queue_tasks.empty()) {
break; // go back to process new tasks or terminate
}
// no tasks, check for sleeping state
if (should_sleep()) {
QUE_INF("%s", "entering sleeping state\n");
sleeping = true;
callback_sleeping_state(true);
req_stop_sleeping = false;
// wait until we are requested to exit sleeping state
condition_tasks.wait(lock, [&]{
return (!running || req_stop_sleeping);
});
if (!running) { // may changed during sleep
break; // terminate
}
QUE_INF("%s", "exiting sleeping state\n");
req_stop_sleeping = false;
callback_sleeping_state(false);
sleeping = false;
time_last_task = ggml_time_ms();
condition_tasks.notify_all(); // notify wait_until_no_sleep()
break; // process new tasks
} else {
// wait for new tasks or timeout for checking sleeping condition
bool res = condition_tasks.wait_for(lock, max_wait_time, [&]{
return (!queue_tasks.empty() || !running);
});
if (res) {
break; // new task arrived or terminate
}
// otherwise, loop again to check sleeping condition
}
}
}
}
void server_queue::cleanup_pending_task(int id_target) {
// no need lock because this is called exclusively by post()
auto rm_func = [id_target](const server_task & task) {
return task.id == id_target;
};
queue_tasks.erase(
std::remove_if(queue_tasks.begin(), queue_tasks.end(), rm_func),
queue_tasks.end());
queue_tasks_deferred.erase(
std::remove_if(queue_tasks_deferred.begin(), queue_tasks_deferred.end(), rm_func),
queue_tasks_deferred.end());
}
//
// server_response
//
void server_response::add_waiting_task_id(int id_task) {
RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", id_task, (int) waiting_task_ids.size());
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.insert(id_task);
}
void server_response::add_waiting_tasks(const std::vector<server_task> & tasks) {
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & task : tasks) {
RES_DBG("add task %d to waiting list. current waiting = %d (before add)\n", task.id, (int) waiting_task_ids.size());
waiting_task_ids.insert(task.id);
}
}
void server_response::remove_waiting_task_id(int id_task) {
RES_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
std::unique_lock<std::mutex> lock(mutex_results);
waiting_task_ids.erase(id_task);
// make sure to clean up all pending results
queue_results.erase(
std::remove_if(queue_results.begin(), queue_results.end(), [id_task](const server_task_result_ptr & res) {
return res->id == id_task;
}),
queue_results.end());
}
void server_response::remove_waiting_task_ids(const std::unordered_set<int> & id_tasks) {
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & id_task : id_tasks) {
RES_DBG("remove task %d from waiting list. current waiting = %d (before remove)\n", id_task, (int) waiting_task_ids.size());
waiting_task_ids.erase(id_task);
}
}
server_task_result_ptr server_response::recv(const std::unordered_set<int> & id_tasks) {
while (true) {
std::unique_lock<std::mutex> lock(mutex_results);
condition_results.wait(lock, [&]{
if (!running) {
RES_DBG("%s : queue result stop\n", "recv");
std::terminate(); // we cannot return here since the caller is HTTP code
}
return !queue_results.empty();
});
for (size_t i = 0; i < queue_results.size(); i++) {
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
server_task_result_ptr res = std::move(queue_results[i]);
queue_results.erase(queue_results.begin() + i);
return res;
}
}
}
// should never reach here
}
server_task_result_ptr server_response::recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout) {
while (true) {
std::unique_lock<std::mutex> lock(mutex_results);
for (int i = 0; i < (int) queue_results.size(); i++) {
if (id_tasks.find(queue_results[i]->id) != id_tasks.end()) {
server_task_result_ptr res = std::move(queue_results[i]);
queue_results.erase(queue_results.begin() + i);
return res;
}
}
std::cv_status cr_res = condition_results.wait_for(lock, std::chrono::seconds(timeout));
if (!running) {
RES_DBG("%s : queue result stop\n", __func__);
std::terminate(); // we cannot return here since the caller is HTTP code
}
if (cr_res == std::cv_status::timeout) {
return nullptr;
}
}
// should never reach here
}
server_task_result_ptr server_response::recv(int id_task) {
std::unordered_set<int> id_tasks = {id_task};
return recv(id_tasks);
}
void server_response::send(server_task_result_ptr && result) {
RES_DBG("sending result for task id = %d\n", result->id);
std::unique_lock<std::mutex> lock(mutex_results);
for (const auto & id_task : waiting_task_ids) {
if (result->id == id_task) {
RES_DBG("task id = %d pushed to result queue\n", result->id);
queue_results.emplace_back(std::move(result));
condition_results.notify_all();
return;
}
}
}
void server_response::terminate() {
running = false;
condition_results.notify_all();
}
//
// server_response_reader
//
void server_response_reader::post_task(server_task && task, bool front) {
GGML_ASSERT(id_tasks.empty() && "post_task() can only be called once per reader");
task.index = 0;
id_tasks.insert(task.id);
states.push_back(task.create_state());
queue_results.add_waiting_task_id(task.id);
queue_tasks.post(std::move(task), front);
}
void server_response_reader::post_tasks(std::vector<server_task> && tasks, bool front) {
GGML_ASSERT(id_tasks.empty() && "post_tasks() can only be called once per reader");
id_tasks = server_task::get_list_id(tasks);
states.reserve(tasks.size());
for (size_t i = 0; i < tasks.size(); i++) {
tasks[i].index = i;
states.push_back(tasks[i].create_state());
}
queue_results.add_waiting_tasks(tasks);
queue_tasks.post(std::move(tasks), front);
}
bool server_response_reader::has_next() const {
return !cancelled && received_count < id_tasks.size();
}
// return nullptr if should_stop() is true before receiving a result
// note: if one error is received, it will stop further processing and return error result
server_task_result_ptr server_response_reader::next(const std::function<bool()> & should_stop) {
while (true) {
server_task_result_ptr result = queue_results.recv_with_timeout(id_tasks, polling_interval_seconds);
if (result == nullptr) {
// timeout, check stop condition
if (should_stop()) {
SRV_DBG("%s", "stopping wait for next result due to should_stop condition\n");
return nullptr;
}
} else {
if (result->is_error()) {
stop(); // cancel remaining tasks
SRV_DBG("%s", "received error result, stopping further processing\n");
return result;
}
if (!states.empty()) {
// update the generation state if needed
const size_t idx = result->index;
GGML_ASSERT(idx < states.size());
result->update(states[idx]);
}
if (result->is_stop()) {
received_count++;
}
return result;
}
}
// should not reach here
}
server_response_reader::batch_response server_response_reader::wait_for_all(const std::function<bool()> & should_stop) {
batch_response batch_res;
batch_res.results.clear();
batch_res.results.resize(id_tasks.size());
while (has_next()) {
auto res = next(should_stop);
if (res == nullptr) {
batch_res.is_terminated = true;
return batch_res;
}
if (res->is_error()) {
batch_res.error = std::move(res);
return batch_res;
}
const size_t idx = res->index;
GGML_ASSERT(idx < batch_res.results.size() && "index out of range");
GGML_ASSERT(batch_res.results[idx] == nullptr && "duplicate result received");
batch_res.results[idx] = std::move(res);
}
return batch_res;
}
void server_response_reader::stop() {
queue_results.remove_waiting_task_ids(id_tasks);
if (has_next() && !cancelled) {
// if tasks is not finished yet, cancel them
cancelled = true;
std::vector<server_task> cancel_tasks;
cancel_tasks.reserve(id_tasks.size());
for (const auto & id_task : id_tasks) {
SRV_WRN("cancel task, id_task = %d\n", id_task);
server_task task(SERVER_TASK_TYPE_CANCEL);
task.id_target = id_task;
queue_results.remove_waiting_task_id(id_task);
cancel_tasks.push_back(std::move(task));
}
// push to beginning of the queue, so it has highest priority
queue_tasks.post(std::move(cancel_tasks), true);
} else {
SRV_DBG("%s", "all tasks already finished, no need to cancel\n");
}
}
|