1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
test_run = require('test_run').new()
fiber = require('fiber')
net_box = require('net.box')
box.schema.user.grant('guest', 'read', 'space', '_space')
box.schema.func.create('do_long_f')
box.schema.user.grant('guest', 'execute', 'function', 'do_long_f')
conn = net_box.connect(box.cfg.listen)
conn2 = net_box.connect(box.cfg.listen)
active = 0
finished = 0
continue = false
limit = box.cfg.net_msg_max
run_max = (limit - 100) / 2
old_readahead = box.cfg.readahead
box.cfg{readahead = 9000}
long_str = string.rep('a', 1000)
test_run:cmd("setopt delimiter ';'")
function do_long_f(...)
active = active + 1
while not continue do
fiber.sleep(0.01)
end
active = active - 1
finished = finished + 1
end;
function do_long(c)
c:call('do_long_f', {long_str})
end;
function run_workers(c)
finished = 0
continue = false
for i = 1, run_max do
fiber.create(do_long, c)
end
end;
-- Wait until 'active' stops growing - it means, that the input
-- is blocked.
function wait_active(value)
while value ~= active do
fiber.sleep(0.01)
end
fiber.sleep(0.01)
-- No more messages.
assert(value == active)
end;
function wait_finished(needed)
continue = true
while finished ~= needed do fiber.sleep(0.01) end
end;
test_run:cmd("setopt delimiter ''");
--
-- Test that message count limit is reachable.
--
run_workers(conn)
run_workers(conn2)
wait_active(run_max * 2)
wait_finished(active)
--
-- Test that each message in a batch is checked. When a limit is
-- reached, other messages must be processed later.
--
run_max = limit * 5
run_workers(conn)
wait_active(limit + 1)
wait_finished(run_max)
--
-- gh-3320: allow to change maximal count of messages.
--
--
-- Test minimal iproto msg count.
--
box.cfg{net_msg_max = 2}
conn:ping()
#conn.space._space:select{} > 0
run_max = 15
run_workers(conn)
wait_active(3)
active
wait_finished(run_max)
--
-- Increase maximal message count when nothing is blocked.
--
box.cfg{net_msg_max = limit * 2}
run_max = limit * 2 - 100
run_workers(conn)
wait_active(run_max)
-- Max can be decreased back even if now the limit is violated.
-- But a new input is blocked in this case.
box.cfg{net_msg_max = limit}
old_active = active
for i = 1, 300 do fiber.create(do_long, conn) end
-- After time active count is not changed - the input is blocked.
wait_active(old_active)
wait_finished(active + 300)
--
-- Check that changing net_msg_max can resume stopped
-- connections.
--
run_max = limit / 2 + 100
run_workers(conn)
run_workers(conn2)
wait_active(limit + 1)
box.cfg{net_msg_max = limit * 2}
wait_active(run_max * 2)
wait_finished(active)
--
-- Test TX fiber pool size limit. It is equal to net_msg_max * 5.
--
run_max = 5
box.cfg{net_msg_max = 10}
run_workers(conn)
run_workers(conn2)
-- Now tx fiber pool size is 50, and 10 requests are active.
wait_active(10)
box.cfg{net_msg_max = 2}
-- Now tx fiber pool size is equal to active request count.
-- More workers can be run, but they will be blocked until older
-- requests are finished.
run_max = 100
run_workers(conn)
wait_finished(110)
conn2:close()
conn:close()
box.schema.func.drop('do_long_f')
box.schema.user.revoke('guest', 'read', 'space', '_space')
box.cfg{readahead = old_readahead, net_msg_max = limit}
|