File: test_sandbox_consumer.lua

package info (click to toggle)
lua-sandbox-extensions 0~git20161128-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 2,596 kB
  • ctags: 1,458
  • sloc: ansic: 4,402; cpp: 2,102; makefile: 8
file content (29 lines) | stat: -rw-r--r-- 1,072 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.

require "kafka"
require "string"

local consumer = kafka.consumer("localhost:9092", {"test"}, {["group.id"] = "integration_testing"}, {["auto.offset.reset"] = "smallest"})
local consumer1 = kafka.consumer("localhost:9092", {"test:1"}, {["group.id"] = "other"})
local pb, topic, partition, key = consumer1:receive()
assert(not pb)

local payloads = {"one", "two", "three"}

function process_message()
    local cnt = 0
    for i=1, 10 do
        pb, topic, partition, key = consumer:receive()
        if pb then
            cnt = cnt + 1
            local msg = decode_message(pb)
            if msg.Payload ~= payloads[cnt] then
                return -1, string.format("expected: %s received: %s", payloads[cnt], msg.Payload)
            end
            if cnt == 3 then return 0 end
        end
    end
    return -1, string.format("received %d/3 messages", cnt)
end