File: test_threads.py

package info (click to toggle)
python-confluent-kafka 1.7.0-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 1,900 kB
  • sloc: python: 8,335; ansic: 6,065; sh: 1,203; makefile: 178
file content (68 lines) | stat: -rw-r--r-- 1,486 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python

from confluent_kafka import Producer
import threading
import time
try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty


class IntendedException (Exception):
    pass


def thread_run(myid, p, q):
    def do_crash(err, msg):
        raise IntendedException()

    for i in range(1, 3):
        cb = None
        if i == 2:
            cb = do_crash
        p.produce('mytopic', value='hi', callback=cb)
        t = time.time()
        try:
            p.flush()
            print(myid, 'Flush took %.3f' % (time.time() - t))
        except IntendedException:
            print(myid, "Intentional callback crash: ok")
            continue

    print(myid, 'Done')
    q.put(myid)


def test_thread_safety():
    """ Basic thread safety tests. """

    q = Queue()
    p = Producer({'socket.timeout.ms': 10,
                  'message.timeout.ms': 10})

    threads = list()
    for i in range(1, 5):
        thr = threading.Thread(target=thread_run, name=str(i), args=[i, p, q])
        thr.start()
        threads.append(thr)

    for thr in threads:
        thr.join()

    # Count the number of threads that exited cleanly
    cnt = 0
    try:
        for x in iter(q.get_nowait, None):
            cnt += 1
    except Empty:
        pass

    if cnt != len(threads):
        raise Exception('Only %d/%d threads succeeded' % (cnt, len(threads)))

    print('Done')


if __name__ == '__main__':
    test_thread_safety()