File: test_re.py

package info (click to toggle)
python3.14 3.14.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 128,000 kB
  • sloc: python: 752,614; ansic: 717,151; xml: 31,250; sh: 5,989; cpp: 4,063; makefile: 1,996; objc: 787; lisp: 502; javascript: 136; asm: 75; csh: 12
file content (62 lines) | stat: -rw-r--r-- 2,087 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import re
import unittest

from test.support import threading_helper
from test.support.threading_helper import run_concurrently


NTHREADS = 10


@threading_helper.requires_working_threading()
class TestRe(unittest.TestCase):
    def test_pattern_sub(self):
        """Pattern substitution should work across threads"""
        pattern = re.compile(r"\w+@\w+\.\w+")
        text = "e-mail: test@python.org or user@pycon.org. " * 5
        results = []

        def worker():
            substituted = pattern.sub("(redacted)", text)
            results.append(substituted.count("(redacted)"))

        run_concurrently(worker_func=worker, nthreads=NTHREADS)
        self.assertEqual(results, [2 * 5] * NTHREADS)

    def test_pattern_search(self):
        """Pattern search should work across threads."""
        emails = ["alice@python.org", "bob@pycon.org"] * 10
        pattern = re.compile(r"\w+@\w+\.\w+")
        results = []

        def worker():
            matches = [pattern.search(e).group() for e in emails]
            results.append(len(matches))

        run_concurrently(worker_func=worker, nthreads=NTHREADS)
        self.assertEqual(results, [2 * 10] * NTHREADS)

    def test_scanner_concurrent_access(self):
        """Shared scanner should reject concurrent access."""
        pattern = re.compile(r"\w+")
        scanner = pattern.scanner("word " * 10)

        def worker():
            for _ in range(100):
                try:
                    scanner.search()
                except ValueError as e:
                    if "already executing" in str(e):
                        pass
                    else:
                        raise

        run_concurrently(worker_func=worker, nthreads=NTHREADS)
        # This test has no assertions. Its purpose is to catch crashes and
        # enable thread sanitizer to detect race conditions. While "already
        # executing" errors are very likely, they're not guaranteed due to
        # non-deterministic thread scheduling, so we can't assert errors > 0.


if __name__ == "__main__":
    unittest.main()