1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
-- Copyright (C) Dejiang Zhu(doujiang24)
local setmetatable = setmetatable
local pairs = pairs
local next = next
local ok, new_tab = pcall(require, "table.new")
if not ok then
new_tab = function (narr, nrec) return {} end
end
local MAX_REUSE = 10000
local _M = {}
local mt = { __index = _M }
function _M.new(self, batch_num, batch_size)
local sendbuffer = {
topics = {},
queue_num = 0,
batch_num = batch_num * 2,
batch_size = batch_size,
}
return setmetatable(sendbuffer, mt)
end
function _M.add(self, topic, partition_id, key, msg)
local topics = self.topics
if not topics[topic] then
topics[topic] = {}
end
if not topics[topic][partition_id] then
topics[topic][partition_id] = {
queue = new_tab(self.batch_num, 0),
index = 0,
used = 0,
size = 0,
offset = 0,
retryable = true,
err = "",
}
end
local buffer = topics[topic][partition_id]
local index = buffer.index
local queue = buffer.queue
if index == 0 then
self.queue_num = self.queue_num + 1
buffer.retryable = true
end
queue[index + 1] = key
queue[index + 2] = msg
buffer.index = index + 2
buffer.size = buffer.size + #msg + (key and #key or 0)
if (buffer.size >= self.batch_size) or (buffer.index >= self.batch_num) then
return true
end
end
function _M.offset(self, topic, partition_id, offset)
local buffer = self.topics[topic][partition_id]
if not offset then
return buffer.offset
end
buffer.offset = offset + (buffer.index / 2)
end
function _M.clear(self, topic, partition_id)
local buffer = self.topics[topic][partition_id]
buffer.index = 0
buffer.size = 0
buffer.used = buffer.used + 1
if buffer.used >= MAX_REUSE then
buffer.queue = new_tab(self.batch_num, 0)
buffer.used = 0
end
self.queue_num = self.queue_num - 1
end
function _M.done(self)
return self.queue_num == 0
end
function _M.err(self, topic, partition_id, err, retryable)
local buffer = self.topics[topic][partition_id]
if err then
buffer.err = err
buffer.retryable = retryable
return buffer.index
else
return buffer.err, buffer.retryable
end
end
function _M.loop(self)
local topics, t, p = self.topics
return function ()
if t then
for partition_id, queue in next, topics[t], p do
p = partition_id
if queue.index > 0 then
return t, partition_id, queue
end
end
end
for topic, partitions in next, topics, t do
t = topic
p = nil
for partition_id, queue in next, partitions, p do
p = partition_id
if queue.index > 0 then
return topic, partition_id, queue
end
end
end
return
end
end
function _M.aggregator(self, client)
local num = 0
local sendbroker = {}
local brokers = {}
local i = 1
for topic, partition_id, queue in self:loop() do
if queue.retryable then
local broker_conf, err = client:choose_broker(topic, partition_id)
if not broker_conf then
self:err(topic, partition_id, err, true)
else
if not brokers[broker_conf] then
brokers[broker_conf] = {
topics = {},
topic_num = 0,
size = 0,
}
end
local broker = brokers[broker_conf]
if not broker.topics[topic] then
brokers[broker_conf].topics[topic] = {
partitions = {},
partition_num = 0,
}
broker.topic_num = broker.topic_num + 1
end
local broker_topic = broker.topics[topic]
broker_topic.partitions[partition_id] = queue
broker_topic.partition_num = broker_topic.partition_num + 1
broker.size = broker.size + queue.size
if broker.size >= self.batch_size then
sendbroker[num + 1] = broker_conf
sendbroker[num + 2] = brokers[broker_conf]
num = num + 2
brokers[broker_conf] = nil
end
end
end
end
for broker_conf, topic_partitions in pairs(brokers) do
sendbroker[num + 1] = broker_conf
sendbroker[num + 2] = brokers[broker_conf]
num = num + 2
end
return num, sendbroker
end
return _M
|