File: test_contextvars.py

package info (click to toggle)
sentry-python 2.18.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,004 kB
  • sloc: python: 55,908; makefile: 114; sh: 111; xml: 2
file content (41 lines) | stat: -rw-r--r-- 794 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import pytest
import random
import time


@pytest.mark.forked
def test_leaks(maybe_monkeypatched_threading):
    import threading

    # Need to explicitly call _get_contextvars because the SDK has already
    # decided upon gevent on import.

    from sentry_sdk import utils

    _, ContextVar = utils._get_contextvars()  # noqa: N806

    ts = []

    var = ContextVar("test_contextvar_leaks")

    success = []

    def run():
        value = int(random.random() * 1000)
        var.set(value)

        for _ in range(100):
            time.sleep(0)
            assert var.get(None) == value

        success.append(1)

    for _ in range(20):
        t = threading.Thread(target=run)
        t.start()
        ts.append(t)

    for t in ts:
        t.join()

    assert len(success) == 20