File: select_ipoll.py

package info (click to toggle)
micropython 1.25.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 48,944 kB
  • sloc: ansic: 317,850; python: 59,539; xml: 4,241; makefile: 3,530; sh: 1,421; javascript: 744; asm: 681; cpp: 45; exp: 11; pascal: 6
file content (55 lines) | stat: -rw-r--r-- 1,281 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Test select.ipoll().

try:
    import socket, select
except ImportError:
    print("SKIP")
    raise SystemExit


def print_poll_output(lst):
    print([(type(obj), flags) for obj, flags in lst])


poller = select.poll()

# Use a new UDP socket for tests, which should be writable but not readable.
try:
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(socket.getaddrinfo("127.0.0.1", 8000)[0][-1])
except OSError:
    print("SKIP")
    raise SystemExit

poller.register(s)

# Basic polling.
print_poll_output(poller.ipoll(0))

# Pass in flags=1 for one-shot behaviour.
print_poll_output(poller.ipoll(0, 1))

# Socket should be deregistered and poll should return nothing.
print_poll_output(poller.ipoll(0))

# Create a second socket.
s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s2.bind(socket.getaddrinfo("127.0.0.1", 8001)[0][-1])

# Register both sockets (to reset the first one).
poller.register(s)
poller.register(s2)

# Basic polling with two sockets.
print_poll_output(poller.ipoll(0))

# Unregister the first socket, to test polling the remaining one.
poller.unregister(s)
print_poll_output(poller.ipoll(0))

# Unregister the second socket, to test polling none.
poller.unregister(s2)
print_poll_output(poller.ipoll(0))

s2.close()
s.close()