File: eventlet_skip_repeat_queue.py

package info (click to toggle)
python-watchdog 6.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 808 kB
  • sloc: python: 6,384; ansic: 609; xml: 155; makefile: 124; sh: 8
file content (33 lines) | stat: -rw-r--r-- 570 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
if __name__ == "__main__":
    import eventlet

    eventlet.monkey_patch()

    from watchdog.utils.bricks import SkipRepeatsQueue

    q = SkipRepeatsQueue(10)
    q.put("A")
    q.put("A")
    q.put("A")
    q.put("A")
    q.put("B")
    q.put("A")

    value = q.get()
    assert value == "A"
    q.task_done()

    assert q.unfinished_tasks == 2

    value = q.get()
    assert value == "B"
    q.task_done()

    assert q.unfinished_tasks == 1

    value = q.get()
    assert value == "A"
    q.task_done()

    assert q.empty()
    assert q.unfinished_tasks == 0