1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
# Test the Windows-only _winapi module
import os
import pathlib
import random
import re
import threading
import time
import unittest
from test.support import import_helper, os_helper
_winapi = import_helper.import_module('_winapi', required_on=['win'])
MAXIMUM_WAIT_OBJECTS = 64
MAXIMUM_BATCHED_WAIT_OBJECTS = (MAXIMUM_WAIT_OBJECTS - 1) ** 2
class WinAPIBatchedWaitForMultipleObjectsTests(unittest.TestCase):
def _events_waitall_test(self, n):
evts = [_winapi.CreateEventW(0, False, False, None) for _ in range(n)]
with self.assertRaises(TimeoutError):
_winapi.BatchedWaitForMultipleObjects(evts, True, 100)
# Ensure no errors raised when all are triggered
for e in evts:
_winapi.SetEvent(e)
try:
_winapi.BatchedWaitForMultipleObjects(evts, True, 100)
except TimeoutError:
self.fail("expected wait to complete immediately")
# Choose 8 events to set, distributed throughout the list, to make sure
# we don't always have them in the first chunk
chosen = [i * (len(evts) // 8) for i in range(8)]
# Replace events with invalid handles to make sure we fail
for i in chosen:
old_evt = evts[i]
evts[i] = -1
with self.assertRaises(OSError):
_winapi.BatchedWaitForMultipleObjects(evts, True, 100)
evts[i] = old_evt
def _events_waitany_test(self, n):
evts = [_winapi.CreateEventW(0, False, False, None) for _ in range(n)]
with self.assertRaises(TimeoutError):
_winapi.BatchedWaitForMultipleObjects(evts, False, 100)
# Choose 8 events to set, distributed throughout the list, to make sure
# we don't always have them in the first chunk
chosen = [i * (len(evts) // 8) for i in range(8)]
# Trigger one by one. They are auto-reset events, so will only trigger once
for i in chosen:
with self.subTest(f"trigger event {i} of {len(evts)}"):
_winapi.SetEvent(evts[i])
triggered = _winapi.BatchedWaitForMultipleObjects(evts, False, 10000)
self.assertSetEqual(set(triggered), {i})
# Trigger all at once. This may require multiple calls
for i in chosen:
_winapi.SetEvent(evts[i])
triggered = set()
while len(triggered) < len(chosen):
triggered.update(_winapi.BatchedWaitForMultipleObjects(evts, False, 10000))
self.assertSetEqual(triggered, set(chosen))
# Replace events with invalid handles to make sure we fail
for i in chosen:
with self.subTest(f"corrupt event {i} of {len(evts)}"):
old_evt = evts[i]
evts[i] = -1
with self.assertRaises(OSError):
_winapi.BatchedWaitForMultipleObjects(evts, False, 100)
evts[i] = old_evt
def test_few_events_waitall(self):
self._events_waitall_test(16)
def test_many_events_waitall(self):
self._events_waitall_test(256)
def test_max_events_waitall(self):
self._events_waitall_test(MAXIMUM_BATCHED_WAIT_OBJECTS)
def test_few_events_waitany(self):
self._events_waitany_test(16)
def test_many_events_waitany(self):
self._events_waitany_test(256)
def test_max_events_waitany(self):
self._events_waitany_test(MAXIMUM_BATCHED_WAIT_OBJECTS)
class WinAPITests(unittest.TestCase):
def test_getlongpathname(self):
testfn = pathlib.Path(os.getenv("ProgramFiles")).parents[-1] / "PROGRA~1"
if not os.path.isdir(testfn):
raise unittest.SkipTest("require x:\\PROGRA~1 to test")
# pathlib.Path will be rejected - only str is accepted
with self.assertRaises(TypeError):
_winapi.GetLongPathName(testfn)
actual = _winapi.GetLongPathName(os.fsdecode(testfn))
# Can't assume that PROGRA~1 expands to any particular variation, so
# ensure it matches any one of them.
candidates = set(testfn.parent.glob("Progra*"))
self.assertIn(pathlib.Path(actual), candidates)
def test_getshortpathname(self):
testfn = pathlib.Path(os.getenv("ProgramFiles"))
if not os.path.isdir(testfn):
raise unittest.SkipTest("require '%ProgramFiles%' to test")
# pathlib.Path will be rejected - only str is accepted
with self.assertRaises(TypeError):
_winapi.GetShortPathName(testfn)
actual = _winapi.GetShortPathName(os.fsdecode(testfn))
# Should contain "PROGRA~" but we can't predict the number
self.assertIsNotNone(re.match(r".\:\\PROGRA~\d", actual.upper()), actual)
def test_namedpipe(self):
pipe_name = rf"\\.\pipe\LOCAL\{os_helper.TESTFN}"
# Pipe does not exist, so this raises
with self.assertRaises(FileNotFoundError):
_winapi.WaitNamedPipe(pipe_name, 0)
pipe = _winapi.CreateNamedPipe(
pipe_name,
_winapi.PIPE_ACCESS_DUPLEX,
8, # 8=PIPE_REJECT_REMOTE_CLIENTS
2, # two instances available
32, 32, 0, 0)
self.addCleanup(_winapi.CloseHandle, pipe)
# Pipe instance is available, so this passes
_winapi.WaitNamedPipe(pipe_name, 0)
with open(pipe_name, 'w+b') as pipe2:
# No instances available, so this times out
# (WinError 121 does not get mapped to TimeoutError)
with self.assertRaises(OSError):
_winapi.WaitNamedPipe(pipe_name, 0)
_winapi.WriteFile(pipe, b'testdata')
self.assertEqual(b'testdata', pipe2.read(8))
self.assertEqual((b'', 0), _winapi.PeekNamedPipe(pipe, 8)[:2])
pipe2.write(b'testdata')
pipe2.flush()
self.assertEqual((b'testdata', 8), _winapi.PeekNamedPipe(pipe, 8)[:2])
|