1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833
|
-- SPDX-License-Identifier: GPL-3.0-or-later
local debug = require('debug')
local ffi = require('ffi')
local kluautil = require('kluautil')
local krprint = require("krprint")
-- Units
kB = 1024
MB = 1024*kB
GB = 1024*MB
-- Time
sec = 1000
second = sec
minute = 60 * sec
min = minute
hour = 60 * minute
day = 24 * hour
-- Logging
-- from syslog.h
LOG_CRIT = 2
LOG_ERR = 3
LOG_WARNING = 4
LOG_NOTICE = 5
LOG_INFO = 6
LOG_DEBUG = 7
local function curr_file() return debug.getinfo(4,'S').source end
local function curr_line() return debug.getinfo(4,'l').currentline end
local function log_fmt(grp, level, fmt, ...)
ffi.C.kr_log_fmt(grp, level,
'CODE_FILE='..curr_file(), 'CODE_LINE='..curr_line(), 'CODE_FUNC=',
'[%-6s] %s\n', ffi.C.kr_log_grp2name(grp), string.format(fmt, ...))
end
function log_req(req, qry_uid, indent, grp, fmt, ...)
ffi.C.kr_log_req1(req, qry_uid, indent, grp, ffi.C.kr_log_grp2name(grp),
'%s\n', string.format(fmt, ...))
end
function log_qry(qry, grp, fmt, ...)
ffi.C.kr_log_q1(qry, grp, ffi.C.kr_log_grp2name(grp),
'%s\n', string.format(fmt, ...))
end
function panic(fmt, ...)
print(debug.traceback('error occurred here (config filename:lineno is '
.. 'at the bottom, if config is involved):', 2))
error(string.format('ERROR: '.. fmt, ...), 0)
end
function log_error(grp, fmt, ...)
log_fmt(grp, LOG_ERR, fmt, ...)
end
function log_warn(grp, fmt, ...)
log_fmt(grp, LOG_WARNING, fmt, ...)
end
function log_notice(grp, fmt, ...)
log_fmt(grp, LOG_NOTICE, fmt, ...)
end
function log_info(grp, fmt, ...)
log_fmt(grp, LOG_INFO, fmt, ...)
end
function log_debug(grp, fmt, ...)
log_fmt(grp, LOG_DEBUG, fmt, ...)
end
function log(fmt, ...)
log_notice(ffi.C.LOG_GRP_MODULE, fmt, ...)
end
-- Resolver bindings
kres = require('kres')
if rawget(kres, 'str2dname') ~= nil then
todname = kres.str2dname
end
worker.resolve_pkt = function (pkt, options, finish, init)
options = kres.mk_qflags(options)
local task = ffi.C.worker_resolve_start(pkt, options)
-- Deal with finish and init callbacks
if finish ~= nil then
local finish_cb
finish_cb = ffi.cast('trace_callback_f',
function (req)
jit.off(true, true) -- JIT for (C -> lua)^2 nesting isn't allowed
finish(req.answer, req)
finish_cb:free()
end)
task.ctx.req.trace_finish = finish_cb
end
if init ~= nil then
init(task.ctx.req)
end
return ffi.C.worker_resolve_exec(task, pkt) == 0
end
worker.resolve = function (qname, qtype, qclass, options, finish, init)
-- Alternatively use named arguments
if type(qname) == 'table' then
local t = qname
qname = t.name
qtype = t.type
qclass = t.class
options = t.options
finish = t.finish
init = t.init
end
qtype = qtype or kres.type.A
qclass = qclass or kres.class.IN
options = kres.mk_qflags(options)
-- LATER: nicer errors for rubbish in qname, qtype, qclass?
local pkt = ffi.C.worker_resolve_mk_pkt(qname, qtype, qclass, options)
if pkt == nil then
panic('failure in worker.resolve(); probably invalid qname "%s"', qname)
end
local ret = worker.resolve_pkt(pkt, options, finish, init)
ffi.C.knot_pkt_free(pkt);
return ret
end
resolve = worker.resolve
-- Shorthand for aggregated per-worker information
worker.info = function ()
local t = worker.stats()
t.pid = worker.pid
return t
end
-- Resolver mode of operation
local current_mode = 'normal'
local mode_table = { normal=0, strict=1, permissive=2 }
function mode(m)
if not m then return current_mode end
if not mode_table[m] then error('unsupported mode: '..m) end
-- Update current operation mode
current_mode = m
option('STRICT', current_mode == 'strict')
option('PERMISSIVE', current_mode == 'permissive')
return true
end
-- Trivial option alias
function reorder_RR(val)
return option('REORDER_RR', val)
end
-- Get/set resolver options via name (string)
function option(name, val)
local flags = kres.context().options;
-- Note: no way to test existence of flags[name] but we want error anyway.
name = string.upper(name) -- convenience
if val ~= nil then
if (val ~= true) and (val ~= false) then
panic('invalid option value: ' .. tostring(val))
end
flags[name] = val;
end
return flags[name];
end
-- Function aliases
-- `env.VAR returns os.getenv(VAR)`
env = {}
setmetatable(env, {
__index = function (_, k) return os.getenv(k) end
})
debugging = {}
setmetatable(debugging, {
__index = function(_, k)
if k == 'assertion_abort' then return ffi.C.kr_dbg_assertion_abort
elseif k == 'assertion_fork' then return ffi.C.kr_dbg_assertion_fork
else panic('invalid debugging option: ' .. tostring(k))
end
end,
__newindex = function(_, k, v)
if k == 'assertion_abort' then ffi.C.kr_dbg_assertion_abort = v
elseif k == 'assertion_fork' then ffi.C.kr_dbg_assertion_fork = v
else panic('invalid debugging option: ' .. tostring(k))
end
end
})
-- Quick access to interfaces
-- `net.<iface>` => `net.interfaces()[iface]`
-- `net = {addr1, ..}` => `net.listen(name, addr1)`
-- `net.ipv{4,6} = {true, false}` => enable/disable IPv{4,6}
setmetatable(net, {
__index = function (t, k)
local v = rawget(t, k)
if v then return v
elseif k == 'ipv6' then return not option('NO_IPV6')
elseif k == 'ipv4' then return not option('NO_IPV4')
else return net.interfaces()[k]
end
end,
__newindex = function (t,k,v)
if k == 'ipv6' then return option('NO_IPV6', not v)
elseif k == 'ipv4' then return option('NO_IPV4', not v)
else
local iname = rawget(net.interfaces(), v)
if iname then t.listen(iname)
else t.listen(v)
end
end
end
})
-- Syntactic sugar for module loading
-- `modules.<name> = <config>`
setmetatable(modules, {
__newindex = function (_, k, v)
if type(k) == 'number' then
k, v = v, nil
end
if not rawget(_G, k) then
modules.load(k)
k = string.match(k, '[%w_]+')
local mod = _G[k]
local config = mod and rawget(mod, 'config')
if mod ~= nil and config ~= nil then
if k ~= v then config(v)
else config()
end
end
end
end
})
-- Set up lua table for a C module. (Internal function.)
function modules_create_table_for_c(kr_module_ud)
local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
--- Set up the global table named according to the module.
if kr_module.config == nil and kr_module.props == nil then
return
end
local module = {}
local module_name = ffi.string(kr_module.name)
_G[module_name] = module
--- Construct lua functions for properties.
if kr_module.props ~= nil then
local i = 0
while true do
local prop = kr_module.props[i]
local cb = prop.cb
if cb == nil then break; end
module[ffi.string(prop.name)] =
function (arg) -- lua wrapper around kr_prop_cb function typedef
local arg_conv
if type(arg) == 'table' or type(arg) == 'boolean' then
arg_conv = tojson(arg)
elseif arg ~= nil then
arg_conv = tostring(arg)
end
local ret_cstr = cb(ffi.C.the_worker.engine, kr_module, arg_conv)
if ret_cstr == nil then
return nil
end
-- LATER(optim.): superfluous copying
local ret_str = ffi.string(ret_cstr)
-- This is a bit ugly, but the API is that invalid JSON
-- should be just returned as string :-(
local status, ret = pcall(fromjson, ret_str)
if not status then ret = ret_str end
ffi.C.free(ret_cstr)
return ret
end
i = i + 1
end
end
--- Construct lua function for config().
if kr_module.config ~= nil then
module.config =
function (arg)
local arg_conv
if type(arg) == 'table' or type(arg) == 'boolean' then
arg_conv = tojson(arg)
elseif arg ~= nil then
arg_conv = tostring(arg)
end
return kr_module.config(kr_module, arg_conv)
end
end
--- Add syntactic sugar for get() and set() properties.
--- That also "catches" any commands like `moduleName.foo = bar`.
local m_index, m_newindex
local get_f = rawget(module, 'get')
if get_f ~= nil then
m_index = function (_, key)
return get_f(key)
end
else
m_index = function ()
error('module ' .. module_name .. ' does not support indexing syntax sugar')
end
end
local set_f = rawget(module, 'set')
if set_f ~= nil then
m_newindex = function (_, key, value)
-- This will produce a nasty error on some non-string parameters.
-- Still, we already use it with integer values, e.g. in predict module :-/
return set_f(key .. ' ' .. value)
end
else
m_newindex = function ()
error('module ' .. module_name .. ' does not support assignment syntax sugar')
end
end
setmetatable(module, {
-- note: the two functions only get called for *missing* indices
__index = m_index,
__newindex = m_newindex,
})
end
local layer_ctx = ffi.C.kr_layer_t_static
-- Utilities internal for lua layer glue; see ../ffimodule.c
modules_ffi_layer_wrap1 = function (layer_cb)
return layer_cb(layer_ctx.state, layer_ctx.req)
end
modules_ffi_layer_wrap2 = function (layer_cb)
return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt)
end
modules_ffi_layer_wrap_checkout = function (layer_cb)
return layer_cb(layer_ctx.state, layer_ctx.req, layer_ctx.pkt,
layer_ctx.dst, layer_ctx.is_stream)
end
modules_ffi_wrap_modcb = function (cb, kr_module_ud) -- this one isn't for layer
local kr_module = ffi.cast('struct kr_module **', kr_module_ud)[0]
return cb(kr_module)
end
-- Return filesystem size where the cache resides.
cache.fssize = function ()
local path = cache.current_storage or '.'
-- As it is now, `path` may or may not include the lmdb:// prefix.
if string.sub(path, 1, 7) == 'lmdb://' then
path = string.sub(path, 8)
end
if #path == 0 then
path = '.'
end
local size = tonumber(ffi.C.kr_fssize(path))
if size < 0 then
panic('cache.fssize(): %s', ffi.string(ffi.C.knot_strerror(size)))
else
return size
end
end
cache.clear = function (name, exact_name, rr_type, chunk_size, callback, prev_state)
if name == nil or (name == '.' and not exact_name) then
-- keep same output format as for 'standard' clear
local total_count = cache.count()
if not cache.clear_everything() then
error('unable to clear everything')
end
return {count = total_count}
end
-- Check parameters, in order, and set defaults if missing.
local dname = kres.str2dname(name)
if not dname then error('cache.clear(): incorrect name passed') end
if exact_name == nil then exact_name = false end
if type(exact_name) ~= 'boolean'
then error('cache.clear(): incorrect exact_name passed') end
local cach = kres.context().cache;
local rettable = {}
-- Apex warning. If the caller passes a custom callback,
-- we assume they are advanced enough not to need the check.
-- The point is to avoid repeating the check in each callback iteration.
if callback == nil then
local apex_array = ffi.new('knot_dname_t *[1]') -- C: dname **apex_array
local ret = ffi.C.kr_cache_closest_apex(cach, dname, false, apex_array)
if ret < 0 then
error(ffi.string(ffi.C.knot_strerror(ret))) end
if not ffi.C.knot_dname_is_equal(apex_array[0], dname) then
local apex_str = kres.dname2str(apex_array[0])
rettable.not_apex = 'to clear proofs of non-existence call '
.. 'cache.clear(\'' .. tostring(apex_str) ..'\')'
rettable.subtree = apex_str
end
ffi.C.free(apex_array[0])
end
if rr_type ~= nil then
-- Special case, without any subtree searching.
if not exact_name
then error('cache.clear(): specifying rr_type only supported with exact_name') end
if chunk_size or callback
then error('cache.clear(): chunk_size and callback parameters not supported with rr_type') end
local ret = ffi.C.kr_cache_remove(cach, dname, rr_type)
if ret < 0 then error(ffi.string(ffi.C.knot_strerror(ret))) end
return {count = 1}
end
if chunk_size == nil then chunk_size = 100 end
if type(chunk_size) ~= 'number' or chunk_size <= 0
then error('cache.clear(): chunk_size has to be a positive integer') end
-- Do the C call, and add chunk_size warning.
rettable.count = ffi.C.kr_cache_remove_subtree(cach, dname, exact_name, chunk_size)
if rettable.count == chunk_size then
local msg_extra = ''
if callback == nil then
msg_extra = '; the default callback will continue asynchronously'
end
rettable.chunk_limit = 'chunk size limit reached' .. msg_extra
end
-- Default callback function: repeat after 1ms
if callback == nil then callback =
function (cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbprev_state, cbrettable)
if cbrettable.count < 0 then error(ffi.string(ffi.C.knot_strerror(cbrettable.count))) end
if cbprev_state == nil then cbprev_state = { round = 0 } end
if type(cbprev_state) ~= 'table'
then error('cache.clear() callback: incorrect prev_state passed') end
cbrettable.round = cbprev_state.round + 1
if (cbrettable.count == cbchunk_size) then
event.after(1, function ()
cache.clear(cbname, cbexact_name, cbrr_type, cbchunk_size, cbself, cbrettable)
end)
elseif cbrettable.round > 1 then
log_info(ffi.C.LOG_GRP_CACHE, 'asynchronous cache.clear(\'' .. cbname .. '\', '
.. tostring(cbexact_name) .. ') finished')
end
return cbrettable
end
end
return callback(name, exact_name, rr_type, chunk_size, callback, prev_state, rettable)
end
-- Syntactic sugar for cache
-- `cache[x] -> cache.get(x)`
-- `cache.{size|storage} = value`
setmetatable(cache, {
__index = function (t, k)
local res = rawget(t, k)
if not res and not rawget(t, 'current_size') then return res end
-- Beware: t.get returns empty table on failure to find.
-- That would be confusing here (breaking kresc), so return nil instead.
res = t.get(k)
if res and next(res) ~= nil then return res else return nil end
end,
__newindex = function (t,k,v)
-- Defaults
local storage = rawget(t, 'current_storage')
if not storage then storage = 'lmdb://' end
local size = rawget(t, 'current_size')
if not size then size = 10*MB end
-- Declarative interface for cache
if k == 'size' then t.open(v, storage)
elseif k == 'storage' then t.open(size, v) end
end
})
-- Make sandboxed environment
local function make_sandbox(defined)
local __protected = {
worker = true, env = true, debugging = true, modules = true,
cache = true, net = true, trust_anchors = true
}
-- Compute and export the list of top-level names (hidden otherwise)
local nl = ""
for n in pairs(defined) do
nl = nl .. n .. "\n"
end
return setmetatable({ __orig_name_list = nl }, {
__index = defined,
__newindex = function (_, k, v)
if __protected[k] then
for k2,v2 in pairs(v) do
defined[k][k2] = v2
end
else
defined[k] = v
end
end
})
end
-- Compatibility sandbox
_G = make_sandbox(getfenv(0))
setfenv(0, _G)
-- Load default modules
trust_anchors = require('trust_anchors')
modules.load('ta_update')
modules.load('ta_signal_query')
modules.load('policy')
modules.load('priming')
modules.load('detect_time_skew')
modules.load('detect_time_jump')
modules.load('ta_sentinel')
modules.load('edns_keepalive')
modules.load('refuse_nord')
modules.load('watchdog')
modules.load('extended_error')
-- Load keyfile_default
trust_anchors.add_file('@keyfile_default@', @unmanaged@)
local function eval_cmd_compile(line, raw)
-- Compatibility sandbox code loading
local function load_code(code)
if getfenv then -- Lua 5.1
return loadstring(code)
else -- Lua 5.2+
return load(code, nil, 't', _ENV)
end
end
local err, chunk
chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')')
if err then
chunk, err = load_code(line)
end
return chunk, err
end
-- Interactive command evaluation
function eval_cmd(line, raw)
local chunk, err = eval_cmd_compile(line, raw)
if not err then
return chunk()
else
error(err)
end
end
-- Pretty printing
local pprint = require('krprint').pprint
function table_print(...)
local strs = {}
local nargs = select('#', ...)
if nargs == 0 then
return nil
end
for n=1,nargs do
local arg = select(n, ...)
local arg_str = pprint(arg)
if nargs > 1 then
table.insert(strs, string.format("%s\t-- result # %d", arg_str, n))
else
table.insert(strs, arg_str)
end
end
return table.concat(strs, '\n')
end
-- This extends the worker module to allow asynchronous execution of functions and nonblocking I/O.
-- The current implementation combines cqueues for Lua interface, and event.socket() in order to not
-- block resolver engine while waiting for I/O or timers.
--
local has_cqueues, cqueues = pcall(require, 'cqueues')
if has_cqueues then
-- Export the asynchronous sleep function
worker.sleep = cqueues.sleep
-- Create metatable for workers to define the API
-- It can schedule multiple cqueues and yield execution when there's a wait for blocking I/O or timer
local asynchronous_worker_mt = {
work = function (self)
local ok, err, _, co = self.cq:step(0)
if not ok then
log_warn(ffi.C.LOG_GRP_SYSTEM, '%s error: %s %s', self.name or 'worker', err, debug.traceback(co))
end
-- Reschedule timeout or create new one
local timeout = self.cq:timeout()
if timeout then
-- Throttle timeouts to avoid too frequent wakeups
if timeout == 0 then timeout = 0.00001 end
-- Convert from seconds to duration
timeout = timeout * sec
if not self.next_timeout then
self.next_timeout = event.after(timeout, self.on_step)
else
event.reschedule(self.next_timeout, timeout)
end
else -- Cancel running timeout when there is no next deadline
if self.next_timeout then
event.cancel(self.next_timeout)
self.next_timeout = nil
end
end
end,
wrap = function (self, f)
self.cq:wrap(f)
end,
loop = function (self)
self.on_step = function () self:work() end
self.event_fd = event.socket(self.cq:pollfd(), self.on_step)
end,
close = function (self)
if self.event_fd then
event.cancel(self.event_fd)
self.event_fd = nil
end
end,
}
-- Implement the coroutine worker with cqueues
local function worker_new (name)
return setmetatable({name = name, cq = cqueues.new()}, { __index = asynchronous_worker_mt })
end
-- Create a default background worker
worker.bg_worker = worker_new('worker.background')
worker.bg_worker:loop()
-- Wrap a function for asynchronous execution
function worker.coroutine (f)
worker.bg_worker:wrap(f)
end
else
-- Disable asynchronous execution
local function disabled ()
error('Lua library cqueues is required for asynchronous execution (luaJIT requires library for Lua 5.1)')
end
worker.sleep = disabled
worker.map = disabled
worker.coroutine = disabled
worker.bg_worker = setmetatable({}, { __index = disabled })
end
-- Global commands for map()
-- must be public because it is called from eval_cmd()
-- when map() commands are read from control socket
function _map_luaobj_call_wrapper(cmd)
local func = eval_cmd_compile(cmd, true)
local ret = kluautil.kr_table_pack(xpcall(func, debug.traceback))
local ok, serial = pcall(krprint.serialize_lua, ret, 'error')
if not ok then
log_error(ffi.C.LOG_GRP_SYSTEM, 'failed to serialize map() response %s (%s)',
table_print(ret), serial)
return krprint.serialize_lua(
kluautil.kr_table_pack(false, "returned values cannot be serialized: "
.. serial))
else
return serial
end
end
local function _sock_errmsg(path, desc)
return string.format(
'map() error while communicating with %s: %s',
path, desc)
end
local function _sock_check(sock, call, params, path, desc)
local errprefix = _sock_errmsg(path, desc) .. ': '
local retvals = kluautil.kr_table_pack(pcall(call, unpack(params)))
local ok = retvals[1]
if not ok then
error(errprefix .. tostring(retvals[2]))
end
local rerr, werr = sock:error()
if rerr or werr then
error(string.format('%sread error %s; write error %s', errprefix, rerr, werr))
end
if retvals[2] == nil then
error(errprefix .. 'unexpected nil result')
end
return unpack(retvals, 2, retvals.n)
end
local function _sock_assert(condition, path, desc)
if not condition then
error(_sock_errmsg(path, desc))
end
end
local function map_send_recv(cmd, path)
local bit = require('bit')
local socket = require('cqueues.socket')
local s = socket.connect({ path = path })
s:setmaxerrs(0)
s:setmode('bn', 'bn')
local status, err = pcall(s.connect, s)
if not status then
log_error(ffi.C.LOG_GRP_NETWORK, 'map() error while connecting to control socket %s: '
.. '%s (ignoring this socket)', path, err)
return nil
end
local ret = _sock_check(s, s.write, {s, '__binary\n'}, path,
'write __binary')
_sock_assert(ret, path,
'write __binary result')
local recv = _sock_check(s, s.read, {s, 2}, path,
'read reply to __binary')
_sock_assert(recv and recv == '> ', path,
'unexpected reply to __binary')
_sock_check(s, s.write, {s, cmd..'\n'}, path,
'command write')
recv = _sock_check(s, s.read, {s, 4}, path,
'response length read')
_sock_assert(recv and #recv == 4, path,
'length of response length preamble does not match')
local len = tonumber(recv:byte(1))
for i=2,4 do
len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
end
ret = _sock_check(s, s.read, {s, len}, path,
'read response')
_sock_assert(ret and #ret == len, path,
'actual response length does not match length in preamble')
s:close()
return ret
end
-- internal use only
-- Call cmd on each instance via control sockets.
-- @param format - "luaobj" if individual results should be Lua objects
-- - "strings" for eval_cmd output for each instance
-- @returns table with results, one item per instance + key n=number of instances
-- (order of return values is undefined)
-- @throws Lua error if:
-- - communication failed in the middle of transaction
-- - a result is not serializable
-- - individual call throws an error
-- - number of return values != 1 per instance per call
-- - cmd execution state is undefined after an error
-- Connection errors at the beginning are ignored to paper over leftover dead sockets.
function map(cmd, format)
local local_sockets = {}
local results = {}
if (type(cmd) ~= 'string') then
panic('map() command must be a string') end
if string.find(cmd, '\n', 1, true) then
panic('map() command cannot contain literal \\n, escape it with \\010') end
if (#cmd <= 0) then
panic('map() command must be non-empty') end
-- syntax check on input command to detect typos early
local chunk, err = eval_cmd_compile(cmd, false)
if not chunk then
panic('failure when compiling map() command: %s', err)
end
format = format or 'luaobj'
if (format ~= 'luaobj' and format ~= 'strings') then
panic('map() output format must be luaobj or strings') end
if format == 'luaobj' then
cmd = '_map_luaobj_call_wrapper([=====[' .. cmd .. ']=====])'
end
-- find out control socket paths
for _,v in pairs(net.list()) do
if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
end
end
local filetab = kluautil.list_dir(worker.control_path)
if next(filetab) == nil then
panic('no control sockets found in directory %s',
worker.control_path)
end
local result_count = 0
-- finally execute it on all instances
for _, file in ipairs(filetab) do
local local_exec = false
for _, lsoc in ipairs(local_sockets) do
if file == lsoc then
local_exec = true
end
end
local path = worker.control_path..file
local path_name = (local_exec and 'this instance') or path
log_info(ffi.C.LOG_GRP_SYSTEM, 'executing map() on %s: command %s', path_name, cmd)
local ret
if local_exec then
ret = eval_cmd(cmd)
else
ret = map_send_recv(cmd, path)
-- skip dead sockets (leftovers from dead instances)
if ret == nil then
goto continue
end
end
result_count = result_count + 1
-- return value is output from eval_cmd
-- i.e. string including "quotes" and Lua escaping in between
assert(type(ret) == 'string', 'map() protocol error, '
.. 'string not retured by follower')
assert(#ret >= 2 and
string.sub(ret, 1, 1) == "'"
and string.sub(ret, -1, -1) == "'",
'map() protocol error, value returned by follower does '
.. 'not look like a string')
-- deserialize string: remove "quotes" and de-escape bytes
ret = krprint.deserialize_lua(ret)
if format == 'luaobj' then
-- ret should be table with xpcall results serialized into string
ret = krprint.deserialize_lua(ret)
assert(type(ret) == 'table', 'map() protocol error, '
.. 'table with results not retured by follower')
if (ret.n ~= 2) then
log_error(ffi.C.LOG_GRP_SYSTEM, 'got unsupported map() response: %s', table_print(ret))
panic('unexpected number of return values in map() response: '
.. 'only single return value is allowed, '
.. 'use kluautil.kr_table_pack() helper')
end
local ok, retval = ret[1], ret[2]
if ok == false then
panic('error when executing map() command on control socket %s: '
.. '%s. command execution state is now undefined!',
path, retval)
end
-- drop wrapper table and return only the actual return value
ret = retval
end
results[result_count] = ret
::continue::
end
results.n = result_count
return results
end
|