File: proxyfuncgen.lua

package info (click to toggle)
memcached 1.6.39-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,320 kB
  • sloc: ansic: 62,281; perl: 12,500; sh: 4,569; makefile: 468; python: 402; xml: 59
file content (823 lines) | stat: -rw-r--r-- 26,117 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
-- New style request factories and backend request handling.
--
-- First, this API adds a "request func generation" step when a new request
-- starts: if there is not already a cached function to use, call the
-- "generator" function, then use the response to run the request. This generated
-- function is reused until the parent generator is swapped out during reload.
-- This allows the user to pre-allocate and pre-calculate objects and data,
-- offering both safety and performance.
-- Future API revisions (such as stats) will rely on this generation step to
-- be more user friendly while retaining performance.
--
-- For backend IO's this unifies what was once two API's:
--  - result = pool(request): the non-async API
--  - table = mcp.await(etc)
--
-- It is now a single system governeed by a request context object (rctx).
-- This new system allows queueing a nearly arbitrary set of requests,
-- "blocking" a client on any individual response, and using callbacks to
-- make decisions on if a response is "good", to resume processing early, or
-- post-process once all responses are received.
--
-- The queueing system is now recursive: a fgen can new_handle() another fgen.
-- Meaning configurations can be assembled as call graphs. IE: If you have a
-- route function A and want to "shadow" some of its requests onto route
-- function B, instead of making A more complex you can create a third
-- function C which splits the traffic.
--
-- API docs: https://github.com/memcached/memcached/wiki/Proxy

verbose = true
-- global for an error handling test
failgen_armed = false
failgenret_armed = false

function say(...)
    if verbose then
        print(...)
    end
end

function mcp_config_pools()
    local srv = mcp.backend

    local b1 = srv('b1', '127.0.0.1', 12011)
    local b2 = srv('b2', '127.0.0.1', 12012)
    local b3 = srv('b3', '127.0.0.1', 12013)
    local b4 = srv('b4', '127.0.0.1', 12014)
    local b1z = mcp.pool({b1})
    local b2z = mcp.pool({b2})
    local b3z = mcp.pool({b3})
    local b4z = mcp.pool({b4})

    local blog = srv({
        label = "blog",
        host = "127.0.0.1",
        port = "12015",
        log = {
            rate = 5,
            errors = true,
            deadline = 250,
            tag = "fastlog"
        }
    })
    blogz = mcp.pool({blog})

    local p = {p = {b1z, b2z, b3z}, b = b4z, pl = blogz}

    --return mcp.pool(b1z, { iothread = false })
    return p
end

-- many of these factories have the same basic init pattern, so we can save
-- some code.
function new_basic_factory(arg, func)
    local fgen = mcp.funcgen_new()
    local o = { t = {}, c = 0 }

    -- some of them have a wait, some don't.
    -- here would be a good place to do bounds checking on arguments in
    -- similar functions.
    o.wait = arg.wait
    o.timeout = arg.timeout
    o.mode = arg.mode
    for _, v in pairs(arg.list) do
        table.insert(o.t, fgen:new_handle(v))
        o.c = o.c + 1
    end

    fgen:ready({ f = func, a = o, n = arg.name})
    return fgen
end

function new_prefix_factory(arg)
    local fgen = mcp.funcgen_new()
    local o = {}
    o.pattern = arg.pattern
    o.default = fgen:new_handle(arg.default)

    o.map = {}
    -- get handler ids for each sub-route value
    -- convert the map.
    for k, v in pairs(arg.list) do
        o.map[k] = fgen:new_handle(v)
    end

    fgen:ready({ f = prefix_factory_gen, a = o, n = arg.name })
    return fgen
end

function prefix_factory_gen(rctx, arg)
    local p = arg.pattern
    local map = arg.map
    local d = arg.default

    say("generating a prefix factory function")

    return function(r)
        local key = r:key()

        local handle = map[string.match(key, p)]
        if handle == nil then
            return rctx:enqueue_and_wait(r, d)
        end
        return rctx:enqueue_and_wait(r, handle)
    end
end

function new_direct_factory(arg)
    local fgen = mcp.funcgen_new()
    local h = fgen:new_handle(arg.p)
    fgen:ready({ f = direct_factory_gen, a = h, n = arg.name })
    return fgen
end

function direct_factory_gen(rctx, h)
    say("generating direct factory function")

    return function(r)
        say("waiting on a single pool")
        return rctx:enqueue_and_wait(r, h)
    end
end

function new_locality_factory(arg)
    local fgen = mcp.funcgen_new()
    local h = fgen:new_handle(arg.p)
    fgen:ready({ f = locality_factory_gen, a = h, n = arg.name })
    return fgen
end

-- factory for proving slots have unique environmental memory.
-- we need to wait on a backend to allow the test to pipeline N requests in
-- parallel, to prove that each parallel slot has a unique lua environment.
function locality_factory_gen(rctx, h)
    say("generating locality factory function")
    local x = 0

    return function(r)
        x = x + 1
        say("returning from locality: " .. x)
        local res = rctx:enqueue_and_wait(r, h)
        return "HD t" .. x .. "\r\n"
    end
end

-- waits for only the _first_ queued handle to return.
-- ie; position 1 in the table.
-- we do a numeric for loop in the returned function to avoid allocations done
-- by a call to pairs()
function first_factory_gen(rctx, arg)
    say("generating first factory function")
    local t = arg.t
    local count = arg.c

    return function(r)
        say("waiting on first of " .. count .. " pools")
        for x=1, count do
            rctx:enqueue(r, t[x])
        end

        return rctx:wait_handle(t[1])
    end
end

-- wait on x out of y
function partial_factory_gen(rctx, arg)
    say("generating partial factory function")
    local t = arg.t
    local count = arg.c
    local wait = arg.wait

    return function(r)
        say("waiting on first " .. wait .. " out of " .. count)
        for x=1, count do
            rctx:enqueue(r, t[x])
        end

        local done = rctx:wait_cond(wait)
        for x=1, count do
            -- :good will only return the result object if the handle's
            -- response was considered "good"
            local res = rctx:res_good(t[x])
            if res ~= nil then
                say("found a result")
                return res
            end
            -- TODO: tally up responses and send summary for test.
        end
        say("found nothing")
        -- didn't return anything good, so return one at random.
        for x=1, count do
            local res = rctx:res_any(t[x])
            if res ~= nil then
                return res
            end
        end
    end
end

-- wait on all pool arguments
function all_factory_gen(rctx, arg)
    say("generating all factory function")
    local t = arg.t
    local count = arg.c
    if arg.wait ~= nil then
        count = arg.wait
    end
    -- should be a minor speedup avoiding the table lookup.
    local mode = mcp.WAIT_ANY

    return function(r)
        say("all_factory waiting on " .. count)

        rctx:enqueue(r, t)
        local done = rctx:wait_cond(count, mode)
        -- :any will give us the result object for that handle, regardless
        -- of return code/status.
        local res = rctx:res_any(t[1])

        -- TODO: tally up the responses and return summary for test.
        return res
    end
end

-- wait on the first good or N of total
function fastgood_factory_gen(rctx, arg)
    say("generating fastgood factory function")
    local t = arg.t
    local count = arg.c
    local wait = arg.wait

    local cb = function(res)
        say("running in a callback!")
        if res:hit() then
            say("was a hit!")
            -- return an extra arg telling us to shortcut the wait count
            return mcp.WAIT_GOOD, mcp.WAIT_RESUME
        end
        -- default return code is mcp.WAIT_ANY
    end

    for _, v in pairs(t) do
        rctx:handle_set_cb(v, cb)
    end

    return function(r)
        say("first good or wait for N")

        rctx:enqueue(r, t)
        local done = rctx:wait_cond(wait, mcp.WAIT_GOOD)
        say("fastgood done:", done)

        if done == 1 then
            -- if we just got one "good", we're probably happy.
            for x=1, count do
                -- loop to find the good handle.
                local res = rctx:res_good(t[x])
                if res ~= nil then
                    return res
                end
            end
        else
            -- else we had to wait and now need to decide if it was a miss or
            -- network error.
            -- but for this test we'll just return the first result.
            for x=1, count do
                local res = rctx:res_any(t[x])
                if res ~= nil then
                    return res
                end
            end
        end
    end
end

-- fastgood implemented using internal fastgood state
function fastgoodint_factory_gen(rctx, arg)
    local t = arg.t
    local count = arg.c
    local wait = arg.wait

    return function(r)
        rctx:enqueue(r, t)
        say("enqueing fastgood:", wait)
        local done = rctx:wait_cond(wait, mcp.WAIT_FASTGOOD)
        say("fastgoodint done:", done)

        local final = nil
        for x=1, count do
            local res, mode = rctx:result(t[x])
            if mode == mcp.WAIT_GOOD then
                return res
            elseif res ~= nil then
                final = res
            end
        end
        -- if no good found, return anything.
        return final
    end
end

function new_blocker_factory(arg)
    local fgen = mcp.funcgen_new()
    local o = { c = 0, t = {} }
    o.b = fgen:new_handle(arg.blocker)

    for _, v in pairs(arg.list) do
        table.insert(o.t, fgen:new_handle(v))
        o.c = o.c + 1
    end

    fgen:ready({ f = blocker_factory_gen, a = o, n = arg.name })
    return fgen
end

-- queue a bunch, but shortcut if a special auxiliary handle fails
function blocker_factory_gen(rctx, arg)
    say("generating blocker factory function")
    local t = arg.t
    local count = arg.c
    local blocker = arg.b
    local was_blocked = false

    local cb = function(res)
        -- check the response or tokens or anything special to indicate
        -- success.
        -- for this test we just check if it was a hit.
        if res:hit() then
            was_blocked = false
            return mcp.WAIT_GOOD
        else
            was_blocked = true
            return mcp.WAIT_ANY
        end
    end

    rctx:handle_set_cb(blocker, cb)

    return function(r)
        say("function blocker test")

        -- queue up the real queries we wanted to run.
        rctx:enqueue(r, t)

        -- any wait command will execute all queued queries at once, but here
        -- we only wait for the blocker to complete.
        local bres = rctx:enqueue_and_wait(r, blocker)

        -- another way of doing this is to ask:
        -- local res = rctx:res_good(blocker)
        -- if a result was returned, the callback had returned WAIT_GOOD
        if was_blocked == false then
            -- our blocker is happy...
            -- wait for the rest of the handles to come in and make a decision
            -- on what to return to the client.
            local done = rctx:wait_cond(count, mcp.WAIT_ANY)
            return rctx:res_any(t[1])
        else
            return "SERVER_ERROR blocked\r\n"
        end
    end
end

-- log on all callbacks, even if waiting for 1
function logall_factory_gen(rctx, arg)
    say("generating logall factory function")
    local t = arg.t

    local cb = function(res, req)
        say("received a response, logging...")
        mcp.log("received a response: " .. tostring(res:ok()))
        mcp.log_req(req, res, "even more logs", rctx:cfd())
        return mcp.WAIT_ANY
    end

    for _, v in pairs(t) do
        rctx:handle_set_cb(v, cb)
    end

    return function(r)
        rctx:enqueue(r, t)
        return rctx:wait_handle(t[1])
    end
end

-- log a summary after all callbacks run
function summary_factory_gen(rctx, arg)
    say("generating summary factory function")
    local t = arg.t
    local count = arg.c

    local todo = 0
    local cb = function(res)
        say("responses TODO: " .. todo)
        todo = todo - 1
        if todo == 0 then
            mcp.log("received all responses")
        end
    end

    for _, v in pairs(t) do
        rctx:handle_set_cb(v, cb)
    end

    return function(r)
        -- re-seed the todo value that the callback uses
        todo = count

        rctx:enqueue(r, t)
        -- we're just waiting for a single response, but we queue all of the
        -- handles. the callback uses data from the shared environment and a
        -- summary is logged.
        return rctx:wait_handle(t[1])
    end
end

-- testing various waitfor conditions.
function waitfor_factory_gen(rctx, arg)
    say("generating background factory function")
    local t = arg.t
    local count = arg.c

    return function(r)
        local key = r:key()
        if key == "waitfor/a" then
            rctx:enqueue(r, t)
            rctx:wait_cond(0) -- issue the requests in the background
            return "HD t1\r\n" -- return whatever to the client
        elseif key == "waitfor/b" then
            rctx:enqueue(r, t)
            rctx:wait_cond(0) -- issue requests and resume
            -- now go back into wait mode, but we've already dispatched
            local done = rctx:wait_cond(2)
            if done ~= 2 then
                return "SERVER_ERROR invalid wait"
            end
            -- TODO: bonus points, count the goods or check that everyone's t
            -- flag is right.
            for x=1, count do
                local res = rctx:res_good(x)
                if res ~= nil then
                    return res
                end
                return "SERVER_ERROR no good response"
            end
        elseif key == "waitfor/c" then
            rctx:enqueue(r, t[1])
            rctx:wait_cond(0) -- issue the first queued request
            -- queue two more
            rctx:enqueue(r, t[2])
            rctx:enqueue(r, t[3])
            -- wait explicitly for the first queued one.
            return rctx:wait_handle(t[1])
        elseif key == "waitfor/d" then
            -- queue two then wait on each individually
            rctx:enqueue(r, t[1])
            rctx:enqueue(r, t[2])
            rctx:wait_handle(t[1])
            return rctx:wait_handle(t[2])
        end
    end
end

-- try "primary zone" and then fail over to secondary zones.
-- using simplified code that just treats the first pool as the primary zone.
function failover_factory_gen(rctx, arg)
    say("generating failover factory function")
    local t = {}
    local count = arg.c
    local first = arg.t[1]

    for x=2, count do
        table.insert(t, arg.t[x])
    end

    return function(r)
        -- first try local
        local fres = rctx:enqueue_and_wait(r, first)

        if fres == nil or fres:hit() == false then
            -- failed to get a local hit, queue all "far" zones.
            rctx:enqueue(r, t)
            -- wait for one.
            local done = rctx:wait_cond(1, mcp.WAIT_GOOD)
            -- find the good from the second set.
            for x=1, count-1 do
                local res = rctx:res_good(t[x])
                if res ~= nil then
                    say("found a result")
                    return res
                end
            end
            -- got nothing from second set, just return anything.
            return rctx:res_any(first)
        else
            return fres
        end
    end
end

function new_msg_factory(msg, name)
    local fgen = mcp.funcgen_new()
    fgen:ready({ n = name, f = function(rctx)
        return function(r)
            return msg
        end
    end})
    return fgen
end

function new_error_factory(func, name)
    local fgen = mcp.funcgen_new()
    fgen:ready({ f = func, n = name })
    return fgen
end

function errors_factory_gen(rctx)
    say("generating errors factory")

    return function(r)
        local key = r:key()
        -- failure scenarios that require a top-level request context
        if key == "errors/reterror" then
            error("test error")
        elseif key == "errors/retnil" then
            return nil
        elseif key == "errors/retint" then
            return 5
        elseif key == "errors/retnone" then
            return
        end
    end
end

function suberrors_factory_gen(rctx)
    say("generating suberrors factory function")

    return function(r)
        local key = r:key()
        if key == "suberrors/error" then
            error("test error")
        elseif key == "suberrors/nil" then
            return nil
        elseif key == "suberrors/int" then
            return 5
        elseif key == "suberrors/none" then
            return
        elseif key == "suberrors/resume" then
            rctx:sleep(0.25)
            error("error after resuming rctx")
        elseif key == "suberrors/string" then
            -- non error but immediate return scenario.
            return "SERVER_ERROR suberror/string\r\n"
        end

    end
end

function new_split_factory(arg)
    local fgen = mcp.funcgen_new()
    local o = {}
    o.a = fgen:new_handle(arg.a)
    o.b = fgen:new_handle(arg.b)
    fgen:ready({ f = split_factory_gen, a = o, n = name })
    return fgen
end

-- example of a factory that takes two other factories and copies traffic
-- across them.
-- If an additional API's for hashing to numerics are added, keys can be
-- hashed to allow "1/n" of keys to copy to one of the splits. This allows
-- shadowing traffic to new/experimental pools, slow-warming traffic, etc.
function split_factory_gen(rctx, arg)
    say("generating split factory function")
    local a = arg.a
    local b = arg.b

    return function(r)
        say("splitting traffic")
        -- b is the split path.
        rctx:enqueue(r, b)

        -- a is the main path. so we only explicitly wait on and return a.
        return rctx:enqueue_and_wait(r, a)
    end
end

-- test handling of failure to generate a function slot
function failgen_factory_gen(rctx)
    if failgen_armed then
        say("throwing failgen error")
        error("failgen")
    end
    say("arming failgen")
    failgen_armed = true

    return function(r)
        return "NF\r\n"
    end
end

function failgenret_factory_gen(rctx)
    if failgenret_armed then
        return nil
    end
    failgenret_armed = true

    return function(r)
        return "NF\r\n"
    end
end

function badreturn_gen(rctx)
    -- returning a userdata that isn't the correct kind of userdata.
    -- shouldn't crash the daemon!
    return function(r)
        return rctx
    end
end

-- TODO: should really make the wait time ignored if timeout is nil? throws an
-- error but means more lua to deal with optional waits...
function bestres_factory_gen(rctx, arg)
    local handles = arg.t
    local timeout = nil
    if arg.timeout then
        timeout = arg.timeout
    end
    return function(r)
        rctx:enqueue(r, handles)
        if timeout then
            rctx:wait_cond(#handles, mcp.WAIT_ANY, timeout)
        else
            rctx:wait_cond(#handles)
        end
        local res, tag = rctx:best_result(handles)
        return res
    end
end

function worstres_factory_gen(rctx, arg)
    local handles = arg.t
    return function(r)
        rctx:enqueue(r, handles)
        rctx:wait_cond(#handles)
        local res, tag = rctx:worst_result(handles)
        return res
    end
end

function timeout_factory_gen(rctx, arg)
    local handles = arg.t
    local wait = #handles
    local timeout = nil
    local mode = mcp.WAIT_GOOD
    if arg.wait then
        wait = arg.wait
    end
    if arg.timeout then
        timeout = arg.timeout
    end
    if arg.mode then
        mode = arg.mode
    end
    return function(r)
        rctx:enqueue(r, handles)
        if timeout then
            rctx:wait_cond(wait, mode, timeout)
        else
            rctx:wait_cond(wait, mode)
        end
        local res, tag = rctx:best_result(handles)
        return res
    end
end

-- TODO: this might be supported only in a later update.
-- new queue after parent return
-- - do an immediate return + cb queue, queue from that callback
-- - should still work but requires worker lua vm
-- requires removing the need of having an active client socket object to
-- queue new requests for processing.
function postreturn_factory(rctx, arg)

end

-- TODO: demonstrate a split call graph
-- ie; an all split into two single

function mcp_config_routes(p)
    local b_pool = p.b
    local pl = p.pl
    p = p.p
    local single = new_direct_factory({ p = p[1], name = "single" })
    -- use the typically unused backend.
    local singletwo = new_direct_factory({ p = b_pool, name = "singletwo" })

    local first = new_basic_factory({ list = p, name = "first" }, first_factory_gen)
    local partial = new_basic_factory({ list = p, wait = 2, name = "partial" }, partial_factory_gen)
    local all = new_basic_factory({ list = p, name = "all" }, all_factory_gen)
    local fastgood = new_basic_factory({ list = p, wait = 2, name = "fastgood" }, fastgood_factory_gen)
    local fastgoodint = new_basic_factory({ list = p, wait = 2, name = "fastgoodint" }, fastgoodint_factory_gen)
    local blocker = new_blocker_factory({ blocker = b_pool, list = p, name = "blocker" })
    local logall = new_basic_factory({ list = p, name = "logall" }, logall_factory_gen)
    local fastlog = new_direct_factory({ p = pl, name = "fastlog" })
    local subfastlog = new_direct_factory({ p = fastlog, name = "subfastlog" })
    local summary = new_basic_factory({ list = p, name = "summary" }, summary_factory_gen)
    local waitfor = new_basic_factory({ list = p, name = "waitfor" }, waitfor_factory_gen)
    local failover = new_basic_factory({ list = p, name = "failover" }, failover_factory_gen)
    local locality = new_locality_factory({ p = p[1], name = "locality" })

    local errors = new_error_factory(errors_factory_gen, "errors")
    local suberrors = new_error_factory(suberrors_factory_gen, "suberrors")
    local suberr_wrap = new_direct_factory({ p = suberrors, name = "suberrwrap" })
    local badreturn = new_error_factory(badreturn_gen, "badreturn")

    local bestres = new_basic_factory({ list = p, name = "bestres" }, bestres_factory_gen)
    local bestrestime = new_basic_factory({ list = p, timeout = 0.5, name = "bestres" }, bestres_factory_gen)
    local worstres = new_basic_factory({ list = p, name = "worstres" }, worstres_factory_gen)

    -- for testing traffic splitting.
    local split = new_split_factory({ a = single, b = singletwo, name = "split" })
    local splitfailover = new_split_factory({ a = failover, b = singletwo, name = "splitfailover" })

    -- test timeout via subrctx's that themselves timeout
    local timesubone = new_basic_factory({ list = { p[1] }, timeout = 0.25, name = "timesubone" }, timeout_factory_gen)
    local timesubtwo = new_basic_factory({ list = { p[2] }, timeout = 0.25, name = "timesubone" }, timeout_factory_gen)
    local timesubthr = new_basic_factory({ list = { p[3] }, timeout = 0.25, name = "timesubone" }, timeout_factory_gen)
    local timetop = new_basic_factory({ list = { timesubone, timesubtwo, timesubthr }, wait = 1, name = "timetop" }, timeout_factory_gen)
    local timefgtop = new_basic_factory({ list = { timesubone, timesubtwo, timesubthr }, wait = 2, mode = mcp.WAIT_FASTGOOD, name = "timefgtop" }, timeout_factory_gen)

    -- complex stacking:
    -- - parent
    -- - sub with direct child
    -- - child that splits requests, waits
    -- - one sub-child with 3 subrctx children, each with a pool
    -- - the sub-mid child throws fatal error after enqueueing requests.
    -- bug was pending request refcount for final child increasing before
    -- dispatching instead of after dispatching.
    local complex_childa = new_direct_factory({ p = p[1], name = "cmpchilda" })
    local complex_childb = new_direct_factory({ p = p[2], name = "cmpchildb" })
    local complex_childc = new_direct_factory({ p = p[3], name = "cmpchildc" })
    local complex_msg = new_msg_factory("SERVER_ERROR toast\r\n", "cmpmsg")
    local complex_fastgoodint = new_basic_factory({ list = { complex_childa, complex_childb, complex_childc }, name = "cmpfastgoodint" }, fastgoodint_factory_gen)
    local complex_mid = new_basic_factory({ list = { complex_childa, complex_childb, complex_fastgoodint }, wait = 0, name = "cmpmid" }, all_factory_gen)
    local complex_top = new_direct_factory({ p = complex_mid, name = "complex" })

    local map = {
        ["single"] = single,
        ["first"] = first,
        ["partial"] = partial,
        ["all"] = all,
        ["fastgood"] = fastgood,
        ["fastgoodint"] = fastgoodint,
        ["blocker"] = blocker,
        ["logall"] = logall,
        ["fastlog"] = fastlog,
        ["subfastlog"] = subfastlog,
        ["summary"] = summary,
        ["waitfor"] = waitfor,
        ["failover"] = failover,
        ["suberrors"] = suberr_wrap,
        ["errors"] = errors,
        ["split"] = split,
        ["splitfailover"] = splitfailover,
        ["locality"] = locality,
        ["badreturn"] = badreturn,
        ["bestres"] = bestres,
        ["bestrestime"] = bestrestime,
        ["worstres"] = worstres,
        ["timetop"] = timetop,
        ["timefgtop"] = timefgtop,
        ["complex"] = complex_top,
    }

    local parg = {
        default = single,
        list = map,
        pattern = "^/(%a+)/"
    }

    local failgen = new_error_factory(failgen_factory_gen, "failgen")
    local failgenret = new_error_factory(failgenret_factory_gen, "failgenret")

    local mapfail = {
        ["failgen"] = failgen,
        ["failgenret"] = failgenret,
    }
    local farg = {
        default = single,
        list = mapfail,
        pattern = "^(%a+)/",
        name = "prefixfail"
    }

    local pfx = mcp.router_new({ map = map })
    local pfxfail = new_prefix_factory(farg)

    mcp.attach(mcp.CMD_ANY_STORAGE, pfx)
    -- TODO: might need to move this fail stuff to another test file.
    mcp.attach(mcp.CMD_MS, pfxfail)
    mcp.attach(mcp.CMD_MD, pfxfail)
end