1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
|
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// Functions related to the configuration management threads and VM
// TODO (v2): move worker thread related code back out of here.
#include "proxy.h"
#include "vendor/routelib/routelib.h"
// not using queue.h becuase those require specific storage for HEAD.
// it's not possible to have the HEAD simply be in the proxy context because
// it would need to know the offset into this private structure.
// This might be doable but the problem is too trivial to spend time on it.
#define MCP_LUAFILE_SIZE 16384
struct _mcp_luafile {
size_t size;
size_t used;
bool loaded; // flip this to false before each load use
char *buf;
char *fname; // filename to load
struct _mcp_luafile *next;
};
static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
(void)L;
struct _mcp_luafile *db = ud;
if (db->used + sz > db->size) {
// increase by blocks instead of doubling to avoid memory waste
db->size += MCP_LUAFILE_SIZE;
char *nb = realloc(db->buf, db->size);
if (nb == NULL) {
return -1;
}
db->buf = nb;
}
memcpy(db->buf + db->used, (const char *)p, sz);
db->used += sz;
return 0;
}
static const char * _load_helper(lua_State *L, void *data, size_t *size) {
(void)L;
struct _mcp_luafile *db = data;
if (db->loaded) {
*size = 0;
return NULL;
}
*size = db->used;
db->loaded = true;
return db->buf;
}
void proxy_start_reload(void *arg) {
proxy_ctx_t *ctx = arg;
if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
ctx->loading = true;
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
}
}
int proxy_first_confload(void *arg) {
proxy_ctx_t *ctx = arg;
pthread_mutex_lock(&ctx->config_lock);
ctx->loading = true;
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
while (1) {
bool stop = false;
pthread_mutex_lock(&ctx->config_lock);
if (!ctx->loading) {
stop = true;
}
pthread_mutex_unlock(&ctx->config_lock);
if (stop)
break;
}
int fails = 0;
STAT_L(ctx);
fails = ctx->global_stats.config_reload_fails;
STAT_UL(ctx);
if (fails) {
return -1;
}
return 0;
}
// Manages a queue of inbound objects destined to be deallocated.
static void *_proxy_manager_thread(void *arg) {
proxy_ctx_t *ctx = arg;
globalobj_head_t head;
pthread_mutex_lock(&ctx->manager_lock);
while (1) {
STAILQ_INIT(&head);
while (STAILQ_EMPTY(&ctx->manager_head)) {
pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
}
// pull dealloc queue into local queue.
STAILQ_CONCAT(&head, &ctx->manager_head);
pthread_mutex_unlock(&ctx->manager_lock);
// Config lock is required for using config VM.
pthread_mutex_lock(&ctx->config_lock);
lua_State *L = ctx->proxy_state;
struct mcp_globalobj_s *g;
STAILQ_FOREACH(g, &head, next) {
// we let the object _gc() handle backend/etc references
pthread_mutex_lock(&g->lock);
assert(g->self_ref != -1);
// See comment on mcp_gobj_ref()
if (g->self_ref < -1) {
g->refcount--;
g->self_ref = -g->self_ref;
}
assert(g->self_ref > 0 || g->refcount == 0);
if (g->refcount == 0) {
luaL_unref(L, LUA_REGISTRYINDEX, g->self_ref);
g->self_ref = -1;
}
pthread_mutex_unlock(&g->lock);
}
// force lua garbage collection so any resources close out quickly.
lua_gc(L, LUA_GCCOLLECT);
// twice because objects with garbage collector handlers are only
// marked on the first collection cycle.
lua_gc(L, LUA_GCCOLLECT);
// must hold this lock while interacting with the config VM.
pthread_mutex_unlock(&ctx->config_lock);
// done.
pthread_mutex_lock(&ctx->manager_lock);
}
return NULL;
}
// TODO: only run routine if something changed.
// This compacts all of the names for proxy user stats into a linear buffer,
// which can save considerable CPU when emitting a large number of stats. It
// also saves some total memory by having one linear buffer instead of many
// potentially small aligned allocations.
static void proxy_config_stats_prep(proxy_ctx_t *ctx) {
char *oldnamebuf = ctx->user_stats_namebuf;
struct proxy_user_stats_entry *entries = ctx->user_stats;
size_t namelen = 0;
STAT_L(ctx);
// find size of new compact name buffer
for (int x = 0; x < ctx->user_stats_num; x++) {
if (entries[x].name) {
namelen += strlen(entries[x].name) + 1; // null byte
} else if (entries[x].cname) {
char *name = oldnamebuf + entries[x].cname;
namelen += strlen(name) + 1;
}
}
// start one byte into the cname buffer so we can do faster checks on if a
// name exists or not. so extend the buffer by one byte.
namelen++;
char *namebuf = calloc(1, namelen);
// copy names into the compact buffer
char *p = namebuf + 1;
for (int x = 0; x < ctx->user_stats_num; x++) {
struct proxy_user_stats_entry *e = &entries[x];
char *newname = NULL;
if (e->name) {
// skip blank names.
if (e->name[0]) {
newname = e->name;
}
} else if (e->cname) {
// else re-copy from old buffer
newname = oldnamebuf + e->cname;
}
if (newname) {
// set the buffer offset for this name
e->cname = p - namebuf;
// copy in the name
size_t nlen = strlen(newname);
memcpy(p, newname, nlen);
p += nlen;
*p = '\0'; // add null byte
p++;
} else {
// name is blank or doesn't exist, ensure we skip it.
e->cname = 0;
}
if (e->name) {
// now get rid of the name buffer.
free(e->name);
e->name = NULL;
}
}
ctx->user_stats_namebuf = namebuf;
if (oldnamebuf) {
free(oldnamebuf);
}
STAT_UL(ctx);
}
static void proxy_config_reload(proxy_ctx_t *ctx) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
// gen. used for tracking object lifecycles over time.
// ie: ensuring old things are unloaded.
ctx->config_generation++;
lua_State *L = ctx->proxy_state;
lua_settop(L, 0); // clear off any crud that could have been left on the stack.
// The main stages of config reload are:
// - load and execute the config file
// - run mcp_config_pools()
// - for each worker:
// - copy and execute new lua code
// - copy selector table
// - run mcp_config_routes()
if (proxy_load_config(ctx) != 0) {
// Failed to load. log and wait for a retry.
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
return;
}
proxy_config_stats_prep(ctx);
// TODO (v2): create a temporary VM to test-load the worker code into.
// failing to load partway through the worker VM reloads can be
// critically bad if we're not careful about references.
// IE: the config VM _must_ hold references to selectors and backends
// as long as they exist in any worker for any reason.
for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *thr = get_worker_thread(x);
pthread_mutex_lock(&ctx->worker_lock);
ctx->worker_done = false;
ctx->worker_failed = false;
proxy_reload_notify(thr);
while (!ctx->worker_done) {
// in case of spurious wakeup.
pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
}
pthread_mutex_unlock(&ctx->worker_lock);
// Code load bailed.
if (ctx->worker_failed) {
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
return;
}
}
// Need to clear the reset flag for the stats system after pushing the new
// config to each worker.
STAT_L(ctx);
for (int x = 0; x < ctx->user_stats_num; x++) {
ctx->user_stats[x].reset = false;
}
STAT_UL(ctx);
lua_pop(ctx->proxy_state, 1); // drop config_pools return value
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
}
// Very basic scheduler. Unsorted because we don't expect a huge list of
// functions to run.
static void proxy_run_crons(proxy_ctx_t *ctx) {
lua_State *L = ctx->proxy_state;
assert(lua_gettop(L) == 0);
assert(ctx->cron_ref);
struct timespec now;
// Fetch the cron table. Created on startup so must exist.
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);
clock_gettime(CLOCK_REALTIME, &now);
if (ctx->cron_next <= now.tv_sec) {
ctx->cron_next = INT_MAX;
} else {
// no crons ready.
return;
}
// Loop the cron entries.
lua_pushnil(L);
while (lua_next(L, 1) != 0) {
const char *key = lua_tostring(L, -2);
mcp_cron_t *ce = lua_touserdata(L, -1);
int idx = lua_absindex(L, -1);
// check generation.
if (ctx->config_generation != ce->gen) {
// remove entry.
lua_pushnil(L);
lua_setfield(L, 1, key);
} else if (ce->next <= now.tv_sec) {
// grab func and execute it
lua_getiuservalue(L, idx, 1);
// no arguments or return values
int res = lua_pcall(L, 0, 0, 0);
STAT_INCR(ctx, config_cron_runs, 1);
if (res != LUA_OK) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
STAT_INCR(ctx, config_cron_fails, 1);
lua_pop(L, 1); // drop error.
}
if (ce->repeat) {
ce->next = now.tv_sec + ce->every;
// if rescheduled, check next against ctx. update if sooner
if (ctx->cron_next > ce->next) {
ctx->cron_next = ce->next;
}
} else {
// non-repeating cron. delete entry.
lua_pushnil(L);
lua_setfield(L, 1, key);
}
} else {
// not scheduled to run now, but check if we're next.
if (ctx->cron_next > ce->next) {
ctx->cron_next = ce->next;
}
}
lua_pop(L, 1); // drop value so we can loop.
}
lua_pop(L, 1); // drop cron table.
}
// Thread handling the configuration reload sequence.
// TODO (v2): get a logger instance.
// TODO (v2): making this "safer" will require a few phases of work.
// 1) JFDI
// 2) "test VM" -> from config thread, test the worker reload portion.
// 3) "unit testing" -> from same temporary worker VM, execute set of
// integration tests that must pass.
// 4) run update on each worker, collecting new mcp.attach() hooks.
// Once every worker has successfully executed and set new hooks, roll
// through a _second_ time to actually swap the hook structures and unref
// the old structures where marked dirty.
static void *_proxy_config_thread(void *arg) {
proxy_ctx_t *ctx = arg;
struct timespec wait = {0};
logger_create();
pthread_mutex_lock(&ctx->config_lock);
pthread_cond_signal(&ctx->config_cond);
while (1) {
ctx->loading = false;
// cron only thinks in whole seconds.
wait.tv_sec = ctx->cron_next;
pthread_cond_timedwait(&ctx->config_cond, &ctx->config_lock, &wait);
proxy_run_crons(ctx);
if (ctx->loading) {
proxy_config_reload(ctx);
}
}
return NULL;
}
int _start_proxy_config_threads(proxy_ctx_t *ctx) {
int ret;
pthread_mutex_lock(&ctx->config_lock);
if ((ret = pthread_create(&ctx->config_tid, NULL,
_proxy_config_thread, ctx)) != 0) {
fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&ctx->config_lock);
return -1;
}
thread_setname(ctx->config_tid, "mc-prx-config");
// Avoid returning until the config thread has actually started.
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
pthread_mutex_unlock(&ctx->config_lock);
pthread_mutex_lock(&ctx->manager_lock);
if ((ret = pthread_create(&ctx->manager_tid, NULL,
_proxy_manager_thread, ctx)) != 0) {
fprintf(stderr, "Failed to start proxy manager thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&ctx->manager_lock);
return -1;
}
thread_setname(ctx->manager_tid, "mc-prx-manager");
pthread_mutex_unlock(&ctx->manager_lock);
return 0;
}
// this splits a list of lua startfiles into independent data chunk buffers
// we call this once the first time we start so we can use mallocs without
// having to armor against runtime malloc failures... as much.
static int proxy_init_startfiles(proxy_ctx_t *ctx, const char *files) {
char *flist = strdup(settings.proxy_startfile);
if (flist == NULL) {
fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
return -1;
}
char *b;
for (const char *p = strtok_r(flist, ":", &b);
p != NULL;
p = strtok_r(NULL, ":", &b)) {
struct _mcp_luafile *db = calloc(sizeof(struct _mcp_luafile), 1);
if (db == NULL) {
fprintf(stderr, "ERROR: failed to allocate memory for parsing proxy_startfile\n");
return -1;
}
db->size = MCP_LUAFILE_SIZE;
db->buf = calloc(db->size, 1);
db->fname = strdup(p);
if (db->buf == NULL || db->fname == NULL) {
fprintf(stderr, "ERROR: failed to allocate memory while parsing proxy_startfile\n");
return -1;
}
// put new file at tail
if (ctx->proxy_code == NULL) {
ctx->proxy_code = db;
} else {
struct _mcp_luafile *list = ctx->proxy_code;
while (list->next) {
list = list->next;
}
assert(list->next == NULL);
list->next = db;
}
}
free(flist);
return 0;
}
static int proxy_load_files(proxy_ctx_t *ctx) {
lua_State *L = ctx->proxy_state;
struct _mcp_luafile *db = ctx->proxy_code;
assert(db);
while (db) {
int res;
// clear the buffer for reuse.
memset(db->buf, 0, db->size);
db->used = 0;
if (strcmp(db->fname, "routelib") == 0) {
res = luaL_loadbuffer(L, routelib_lua, routelib_lua_len, "routelib");
} else {
res = luaL_loadfile(L, db->fname);
}
if (res != LUA_OK) {
fprintf(stderr, "ERROR: Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
return -1;
}
// LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
// Now we need to dump the compiled code into bytecode.
// This will then get loaded into worker threads.
lua_dump(L, _dump_helper, db, 0);
// 0 means no error.
// now we complete the data load by calling the function.
res = lua_pcall(L, 0, LUA_MULTRET, 0);
if (res != LUA_OK) {
fprintf(stderr, "ERROR: Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
exit(EXIT_FAILURE);
}
db = db->next;
}
return 0;
}
int proxy_load_config(void *arg) {
proxy_ctx_t *ctx = arg;
lua_State *L = ctx->proxy_state;
int res = 0;
if (ctx->proxy_code == NULL) {
res = proxy_init_startfiles(ctx, settings.proxy_startfile);
if (res != 0) {
return res;
}
}
// load each of the data files in order.
res = proxy_load_files(ctx);
// call the mcp_config_pools function to get the central backends.
lua_getglobal(L, "mcp_config_pools");
if (lua_isnil(L, -1)) {
fprintf(stderr, "ERROR: Configuration file missing 'mcp_config_pools' function\n");
exit(EXIT_FAILURE);
}
lua_pushnil(L); // no "old" config yet.
if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
fprintf(stderr, "ERROR: Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
exit(EXIT_FAILURE);
}
// result is our main config.
return 0;
}
static int _copy_pool(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
// from, -3 should have the userdata.
mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
size_t size = sizeof(mcp_pool_proxy_t);
mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
luaL_setmetatable(to, "mcp.pool_proxy");
pp->main = p;
if (p->use_iothread) {
pp->pool = p->pool;
} else {
// allow 0 indexing for backends when unique to each worker thread
pp->pool = &p->pool[thr->thread_baseid * p->pool_size];
}
lua_pushvalue(from, -3); // dupe pool for referencing
mcp_gobj_ref(from, &p->g); // pops obj copy
return 0;
}
static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr);
// (from, -1) is the source value
// should end with (to, -1) being the new value.
static void _copy_config_table(lua_State *from, lua_State *to, LIBEVENT_THREAD *thr) {
int type = lua_type(from, -1);
bool found = false;
luaL_checkstack(from, 4, "configuration error: table recursion too deep");
luaL_checkstack(to, 4, "configuration error: table recursion too deep");
switch (type) {
case LUA_TNIL:
lua_pushnil(to);
break;
case LUA_TUSERDATA:
// see dump_stack() - check if it's something we handle.
if (lua_getmetatable(from, -1) != 0) {
lua_pushstring(from, "__name");
if (lua_rawget(from, -2) != LUA_TNIL) {
const char *name = lua_tostring(from, -1);
if (strcmp(name, "mcp.pool") == 0) {
_copy_pool(from, to, thr);
found = true;
} else if (strcmp(name, "mcp.ratelim_global_tbf") == 0) {
mcp_ratelim_proxy_tbf(from, to);
found = true;
}
}
lua_pop(from, 2);
}
if (!found) {
proxy_lua_error(from, "unhandled userdata type in configuration table\n");
}
break;
case LUA_TNUMBER:
if (lua_isinteger(from, -1)) {
lua_pushinteger(to, lua_tointeger(from, -1));
} else {
lua_pushnumber(to, lua_tonumber(from, -1));
}
break;
case LUA_TSTRING:
lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
break;
case LUA_TBOOLEAN:
lua_pushboolean(to, lua_toboolean(from, -1));
break;
case LUA_TTABLE:
// TODO (v2): copy the metatable first?
// TODO (v2): size narr/nrec from old table and use createtable to
// pre-allocate.
lua_newtable(to); // throw new table on worker
int t = lua_absindex(from, -1); // static index of table to copy.
int nt = lua_absindex(to, -1); // static index of new table.
lua_pushnil(from); // start iterator for main
while (lua_next(from, t) != 0) {
// (key, -2), (val, -1)
int keytype = lua_type(from, -2);
// to intentionally limit complexity and allow for future
// optimizations we restrict what types may be used as keys
// for sub-tables.
switch (keytype) {
case LUA_TSTRING:
// to[l]string converts the actual key in the table
// into a string, so we must not do that unless it
// already is one.
lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
break;
case LUA_TNUMBER:
if (lua_isinteger(from, -2)) {
lua_pushinteger(to, lua_tointeger(from, -2));
} else {
lua_pushnumber(to, lua_tonumber(from, -2));
}
break;
default:
proxy_lua_error(from, "configuration table keys must be strings or numbers");
}
// lua_settable(to, n) - n being the table
// takes -2 key -1 value, pops both.
// use lua_absindex(L, -1) and so to convert easier?
_copy_config_table(from, to, thr); // push next value.
lua_settable(to, nt);
lua_pop(from, 1); // drop value, keep key.
}
// top of from is now the original table.
// top of to should be the new table.
break;
default:
proxy_lua_error(from, "unhandled data type in configuration table\n");
}
}
// Run from proxy worker to coordinate code reload.
// config_lock must be held first.
void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
proxy_ctx_t *ctx = arg;
pthread_mutex_lock(&ctx->worker_lock);
if (proxy_thread_loadconf(ctx, thr) != 0) {
ctx->worker_failed = true;
}
ctx->worker_done = true;
pthread_cond_signal(&ctx->worker_cond);
pthread_mutex_unlock(&ctx->worker_lock);
}
// FIXME (v2): need to test how to recover from an actual error here. error message
// needs to go somewhere useful, counters added, etc.
int proxy_thread_loadconf(proxy_ctx_t *ctx, LIBEVENT_THREAD *thr) {
lua_State *L = thr->L;
// load the precompiled config functions.
struct _mcp_luafile *db = ctx->proxy_code;
while (db) {
db->loaded = false;
int res = lua_load(L, _load_helper, db, "config", NULL);
if (res != LUA_OK) {
fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
return -1;
}
res = lua_pcall(L, 0, LUA_MULTRET, 0);
if (res != LUA_OK) {
// FIXME (v2): don't exit here!
fprintf(stderr, "Failed to load data into worker thread: %s\n", lua_tostring(L, -1));
return -1;
}
db = db->next;
}
lua_getglobal(L, "mcp_config_routes");
// create deepcopy of argument to pass into mcp_config_routes.
// FIXME (v2): to avoid lua SIGABRT'ing on errors we need to protect the call
// normal pattern:
// lua_pushcfunction(L, &_copy_config_table);
// lua_pushlightuserdata(L, &L2);
// res = la_pcall(L, etc);
// ... but since this is cross-VM we could get errors from not the
// protected VM, breaking setjmp/etc.
// for this part of the code we should override lua_atpanic(),
// allowing us to specifically recover and bail.
// However, again, this will require the next version of the config reload
// code since we are re-using the VM's and a panic can leave us in a
// broken state.
// If the setjump/longjump combos are compatible a pcall for from and
// atpanic for to might work best, since the config VM is/should be long
// running and worker VM's should be rotated.
_copy_config_table(ctx->proxy_state, L, thr);
// copied value is in front of route function, now call it.
if (lua_pcall(L, 1, 0, 0) != LUA_OK) {
fprintf(stderr, "Failed to execute mcp_config_routes: %s\n", lua_tostring(L, -1));
return -1;
}
// update user stats
STAT_L(ctx);
struct proxy_user_stats_entry *us = ctx->user_stats;
int stats_num = ctx->user_stats_num;
struct proxy_user_stats *tus = NULL;
if (stats_num != 0) {
pthread_mutex_lock(&thr->stats.mutex);
if (thr->proxy_user_stats == NULL) {
tus = calloc(1, sizeof(struct proxy_user_stats));
thr->proxy_user_stats = tus;
} else {
tus = thr->proxy_user_stats;
}
// originally this was a realloc routine but it felt fragile.
// that might still be a better idea; still need to zero out the end.
uint64_t *counters = calloc(stats_num, sizeof(uint64_t));
// note that num_stats can _only_ grow in size.
if (tus->counters) {
// pull in old counters, if the names didn't change.
for (int x = 0; x < tus->num_stats; x++) {
if (us[x].reset) {
counters[x] = 0;
} else {
counters[x] = tus->counters[x];
}
}
assert(tus->num_stats <= stats_num);
free(tus->counters);
}
tus->counters = counters;
tus->num_stats = stats_num;
pthread_mutex_unlock(&thr->stats.mutex);
}
// also grab the concurrent request limit
thr->proxy_active_req_limit = ctx->active_req_limit;
STAT_UL(ctx);
// update limit counter(s)
pthread_mutex_lock(&thr->proxy_limit_lock);
thr->proxy_buffer_memory_limit = ctx->buffer_memory_limit;
pthread_mutex_unlock(&thr->proxy_limit_lock);
return 0;
}
|