File: threading_test.py

package info (click to toggle)
python-shapely 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 2,528 kB
  • sloc: python: 18,648; ansic: 6,615; makefile: 88; sh: 62
file content (44 lines) | stat: -rw-r--r-- 1,004 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import threading
from binascii import b2a_hex


def main():
    num_threads = 10
    use_threads = True

    if not use_threads:
        # Run core code
        runShapelyBuilding()
    else:
        threads = [
            threading.Thread(target=runShapelyBuilding, name=str(i), args=(i,))
            for i in range(num_threads)
        ]
        for t in threads:
            t.start()
        for t in threads:
            t.join()


def runShapelyBuilding(num):
    print(f"{num}: Running shapely tests on wkb")
    import shapely.geos

    print(f"{num} GEOS Handle: {shapely.geos.lgeos.geos_handle}")
    import shapely.wkb
    import shapely.wkt

    p = shapely.wkt.loads("POINT (0 0)")
    print(f"{num} WKT: {shapely.wkt.dumps(p)}")
    wkb = shapely.wkb.dumps(p)
    print(f"{num} WKB: {b2a_hex(wkb)}")

    for i in range(10):
        shapely.wkb.loads(wkb)

    print(f"{num} GEOS Handle: {shapely.geos.lgeos.geos_handle}")
    print(f"Done {num}")


if __name__ == "__main__":
    main()