1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
|
#include "server-context.h"
#include "server-http.h"
#include "server-models.h"
#include "arg.h"
#include "common.h"
#include "llama.h"
#include "log.h"
#include <atomic>
#include <exception>
#include <signal.h>
#include <thread> // for std::thread::hardware_concurrency
#if defined(_WIN32)
#include <windows.h>
#endif
static std::function<void(int)> shutdown_handler;
static std::atomic_flag is_terminating = ATOMIC_FLAG_INIT;
static inline void signal_handler(int signal) {
if (is_terminating.test_and_set()) {
// in case it hangs, we can force terminate the server by hitting Ctrl+C twice
// this is for better developer experience, we can remove when the server is stable enough
fprintf(stderr, "Received second interrupt, terminating immediately.\n");
exit(1);
}
shutdown_handler(signal);
}
// wrapper function that handles exceptions and logs errors
// this is to make sure handler_t never throws exceptions; instead, it returns an error response
static server_http_context::handler_t ex_wrapper(server_http_context::handler_t func) {
return [func = std::move(func)](const server_http_req & req) -> server_http_res_ptr {
std::string message;
error_type error;
try {
return func(req);
} catch (const std::invalid_argument & e) {
// treat invalid_argument as invalid request (400)
error = ERROR_TYPE_INVALID_REQUEST;
message = e.what();
} catch (const std::exception & e) {
// treat other exceptions as server error (500)
error = ERROR_TYPE_SERVER;
message = e.what();
} catch (...) {
error = ERROR_TYPE_SERVER;
message = "unknown error";
}
auto res = std::make_unique<server_http_res>();
res->status = 500;
try {
json error_data = format_error_response(message, error);
res->status = json_value(error_data, "code", 500);
res->data = safe_json_to_str({{ "error", error_data }});
SRV_WRN("got exception: %s\n", res->data.c_str());
} catch (const std::exception & e) {
SRV_ERR("got another exception: %s | while handling exception: %s\n", e.what(), message.c_str());
res->data = "Internal Server Error";
}
return res;
};
}
int main(int argc, char ** argv, char ** envp) {
// own arguments required by this example
common_params params;
if (!common_params_parse(argc, argv, params, LLAMA_EXAMPLE_SERVER)) {
return 1;
}
// validate batch size for embeddings
// embeddings require all tokens to be processed in a single ubatch
// see https://github.com/ggml-org/llama.cpp/issues/12836
if (params.embedding && params.n_batch > params.n_ubatch) {
LOG_WRN("%s: embeddings enabled with n_batch (%d) > n_ubatch (%d)\n", __func__, params.n_batch, params.n_ubatch);
LOG_WRN("%s: setting n_batch = n_ubatch = %d to avoid assertion failure\n", __func__, params.n_ubatch);
params.n_batch = params.n_ubatch;
}
if (params.n_parallel < 0) {
LOG_INF("%s: n_parallel is set to auto, using n_parallel = 4 and kv_unified = true\n", __func__);
params.n_parallel = 4;
params.kv_unified = true;
}
// for consistency between server router mode and single-model mode, we set the same model name as alias
if (params.model_alias.empty() && !params.model.name.empty()) {
params.model_alias = params.model.name;
}
common_init();
// struct that contains llama context and inference
server_context ctx_server;
llama_backend_init();
llama_numa_init(params.numa);
LOG_INF("system info: n_threads = %d, n_threads_batch = %d, total_threads = %d\n", params.cpuparams.n_threads, params.cpuparams_batch.n_threads, std::thread::hardware_concurrency());
LOG_INF("\n");
LOG_INF("%s\n", common_params_get_system_info(params).c_str());
LOG_INF("\n");
server_http_context ctx_http;
if (!ctx_http.init(params)) {
LOG_ERR("%s: failed to initialize HTTP server\n", __func__);
return 1;
}
//
// Router
//
// register API routes
server_routes routes(params, ctx_server);
bool is_router_server = params.model.path.empty();
std::optional<server_models_routes> models_routes{};
if (is_router_server) {
// setup server instances manager
try {
models_routes.emplace(params, argc, argv, envp);
} catch (const std::exception & e) {
LOG_ERR("%s: failed to initialize router models: %s\n", __func__, e.what());
return 1;
}
// proxy handlers
// note: routes.get_health stays the same
routes.get_metrics = models_routes->proxy_get;
routes.post_props = models_routes->proxy_post;
routes.get_api_show = models_routes->proxy_get;
routes.post_completions = models_routes->proxy_post;
routes.post_completions_oai = models_routes->proxy_post;
routes.post_chat_completions = models_routes->proxy_post;
routes.post_anthropic_messages = models_routes->proxy_post;
routes.post_anthropic_count_tokens = models_routes->proxy_post;
routes.post_infill = models_routes->proxy_post;
routes.post_embeddings = models_routes->proxy_post;
routes.post_embeddings_oai = models_routes->proxy_post;
routes.post_rerank = models_routes->proxy_post;
routes.post_tokenize = models_routes->proxy_post;
routes.post_detokenize = models_routes->proxy_post;
routes.post_apply_template = models_routes->proxy_post;
routes.get_lora_adapters = models_routes->proxy_get;
routes.post_lora_adapters = models_routes->proxy_post;
routes.get_slots = models_routes->proxy_get;
routes.post_slots = models_routes->proxy_post;
// custom routes for router
routes.get_props = models_routes->get_router_props;
routes.get_models = models_routes->get_router_models;
ctx_http.post("/models/load", ex_wrapper(models_routes->post_router_models_load));
ctx_http.post("/models/unload", ex_wrapper(models_routes->post_router_models_unload));
}
ctx_http.get ("/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
ctx_http.get ("/v1/health", ex_wrapper(routes.get_health)); // public endpoint (no API key check)
ctx_http.get ("/metrics", ex_wrapper(routes.get_metrics));
ctx_http.get ("/props", ex_wrapper(routes.get_props));
ctx_http.post("/props", ex_wrapper(routes.post_props));
ctx_http.post("/api/show", ex_wrapper(routes.get_api_show));
ctx_http.get ("/models", ex_wrapper(routes.get_models)); // public endpoint (no API key check)
ctx_http.get ("/v1/models", ex_wrapper(routes.get_models)); // public endpoint (no API key check)
ctx_http.get ("/api/tags", ex_wrapper(routes.get_models)); // ollama specific endpoint. public endpoint (no API key check)
ctx_http.post("/completion", ex_wrapper(routes.post_completions)); // legacy
ctx_http.post("/completions", ex_wrapper(routes.post_completions));
ctx_http.post("/v1/completions", ex_wrapper(routes.post_completions_oai));
ctx_http.post("/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions));
ctx_http.post("/api/chat", ex_wrapper(routes.post_chat_completions)); // ollama specific endpoint
ctx_http.post("/v1/messages", ex_wrapper(routes.post_anthropic_messages)); // anthropic messages API
ctx_http.post("/v1/messages/count_tokens", ex_wrapper(routes.post_anthropic_count_tokens)); // anthropic token counting
ctx_http.post("/infill", ex_wrapper(routes.post_infill));
ctx_http.post("/embedding", ex_wrapper(routes.post_embeddings)); // legacy
ctx_http.post("/embeddings", ex_wrapper(routes.post_embeddings));
ctx_http.post("/v1/embeddings", ex_wrapper(routes.post_embeddings_oai));
ctx_http.post("/rerank", ex_wrapper(routes.post_rerank));
ctx_http.post("/reranking", ex_wrapper(routes.post_rerank));
ctx_http.post("/v1/rerank", ex_wrapper(routes.post_rerank));
ctx_http.post("/v1/reranking", ex_wrapper(routes.post_rerank));
ctx_http.post("/tokenize", ex_wrapper(routes.post_tokenize));
ctx_http.post("/detokenize", ex_wrapper(routes.post_detokenize));
ctx_http.post("/apply-template", ex_wrapper(routes.post_apply_template));
// LoRA adapters hotswap
ctx_http.get ("/lora-adapters", ex_wrapper(routes.get_lora_adapters));
ctx_http.post("/lora-adapters", ex_wrapper(routes.post_lora_adapters));
// Save & load slots
ctx_http.get ("/slots", ex_wrapper(routes.get_slots));
ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots));
//
// Start the server
//
std::function<void()> clean_up;
if (is_router_server) {
LOG_INF("%s: starting router server, no model will be loaded in this process\n", __func__);
clean_up = [&models_routes]() {
SRV_INF("%s: cleaning up before exit...\n", __func__);
if (models_routes.has_value()) {
models_routes->models.unload_all();
}
llama_backend_free();
};
if (!ctx_http.start()) {
clean_up();
LOG_ERR("%s: exiting due to HTTP server error\n", __func__);
return 1;
}
ctx_http.is_ready.store(true);
shutdown_handler = [&](int) {
ctx_http.stop();
};
} else {
// setup clean up function, to be called before exit
clean_up = [&ctx_http, &ctx_server]() {
SRV_INF("%s: cleaning up before exit...\n", __func__);
ctx_http.stop();
ctx_server.terminate();
llama_backend_free();
};
// start the HTTP server before loading the model to be able to serve /health requests
if (!ctx_http.start()) {
clean_up();
LOG_ERR("%s: exiting due to HTTP server error\n", __func__);
return 1;
}
// load the model
LOG_INF("%s: loading model\n", __func__);
if (!ctx_server.load_model(params)) {
clean_up();
if (ctx_http.thread.joinable()) {
ctx_http.thread.join();
}
LOG_ERR("%s: exiting due to model loading error\n", __func__);
return 1;
}
routes.update_meta(ctx_server);
ctx_http.is_ready.store(true);
LOG_INF("%s: model loaded\n", __func__);
shutdown_handler = [&](int) {
// this will unblock start_loop()
ctx_server.terminate();
};
}
// TODO: refactor in common/console
#if defined (__unix__) || (defined (__APPLE__) && defined (__MACH__))
struct sigaction sigint_action;
sigint_action.sa_handler = signal_handler;
sigemptyset (&sigint_action.sa_mask);
sigint_action.sa_flags = 0;
sigaction(SIGINT, &sigint_action, NULL);
sigaction(SIGTERM, &sigint_action, NULL);
#elif defined (_WIN32)
auto console_ctrl_handler = +[](DWORD ctrl_type) -> BOOL {
return (ctrl_type == CTRL_C_EVENT) ? (signal_handler(SIGINT), true) : false;
};
SetConsoleCtrlHandler(reinterpret_cast<PHANDLER_ROUTINE>(console_ctrl_handler), true);
#endif
if (is_router_server) {
LOG_INF("%s: router server is listening on %s\n", __func__, ctx_http.listening_address.c_str());
LOG_INF("%s: NOTE: router mode is experimental\n", __func__);
LOG_INF("%s: it is not recommended to use this mode in untrusted environments\n", __func__);
if (ctx_http.thread.joinable()) {
ctx_http.thread.join(); // keep the main thread alive
}
// when the HTTP server stops, clean up and exit
clean_up();
} else {
LOG_INF("%s: server is listening on %s\n", __func__, ctx_http.listening_address.c_str());
LOG_INF("%s: starting the main loop...\n", __func__);
// optionally, notify router server that this instance is ready
const char * router_port = std::getenv("LLAMA_SERVER_ROUTER_PORT");
std::thread monitor_thread;
if (router_port != nullptr) {
monitor_thread = server_models::setup_child_server(shutdown_handler);
}
// this call blocks the main thread until queue_tasks.terminate() is called
ctx_server.start_loop();
clean_up();
if (ctx_http.thread.joinable()) {
ctx_http.thread.join();
}
if (monitor_thread.joinable()) {
monitor_thread.join();
}
auto * ll_ctx = ctx_server.get_llama_context();
if (ll_ctx != nullptr) {
llama_memory_breakdown_print(ll_ctx);
}
}
return 0;
}
|