File: test_multiprocessing.py

package info (click to toggle)
construct 2.10.58%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,780 kB
  • sloc: python: 11,135; makefile: 132
file content (25 lines) | stat: -rw-r--r-- 478 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from declarativeunittest import *
from construct import *
from construct.lib import *


def test_multiprocessing():
    import multiprocessing

    def worker(q):
        obj = q.get()
        print(obj)

    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    obj = Container(name="test")
    print(obj)
    queue.put(obj)

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()