File: routing.py

package info (click to toggle)
python-websockets 15.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,948 kB
  • sloc: python: 25,105; javascript: 350; ansic: 148; makefile: 43
file content (154 lines) | stat: -rw-r--r-- 4,526 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python

import asyncio
import datetime
import time
import zoneinfo

from websockets.asyncio.router import route
from websockets.exceptions import ConnectionClosed
from werkzeug.routing import BaseConverter, Map, Rule, ValidationError


async def clock(websocket, tzinfo):
    """Send the current time in the given timezone every second."""
    loop = asyncio.get_running_loop()
    loop_offset = (loop.time() - time.time()) % 1
    try:
        while True:
            # Sleep until the next second according to the wall clock.
            await asyncio.sleep(1 - (loop.time() - loop_offset) % 1)
            now = datetime.datetime.now(tzinfo).replace(microsecond=0)
            await websocket.send(now.isoformat())
    except ConnectionClosed:
        return


async def alarm(websocket, alarm_at, tzinfo):
    """Send the alarm time in the given timezone when it is reached."""
    alarm_at = alarm_at.replace(tzinfo=tzinfo)
    now = datetime.datetime.now(tz=datetime.timezone.utc)

    try:
        async with asyncio.timeout((alarm_at - now).total_seconds()):
            await websocket.wait_closed()
    except asyncio.TimeoutError:
        try:
            await websocket.send(alarm_at.isoformat())
        except ConnectionClosed:
            return


async def timer(websocket, alarm_after):
    """Send the remaining time until the alarm time every second."""
    alarm_at = datetime.datetime.now(tz=datetime.timezone.utc) + alarm_after
    loop = asyncio.get_running_loop()
    loop_offset = (loop.time() - time.time() + alarm_at.timestamp()) % 1

    try:
        while alarm_after.total_seconds() > 0:
            # Sleep until the next second as a delta to the alarm time.
            await asyncio.sleep(1 - (loop.time() - loop_offset) % 1)
            alarm_after = alarm_at - datetime.datetime.now(tz=datetime.timezone.utc)
            # Round up to the next second.
            alarm_after += datetime.timedelta(
                seconds=1,
                microseconds=-alarm_after.microseconds,
            )
            await websocket.send(format_timedelta(alarm_after))
    except ConnectionClosed:
        return


class ZoneInfoConverter(BaseConverter):
    regex = r"[A-Za-z0-9_/+-]+"

    def to_python(self, value):
        try:
            return zoneinfo.ZoneInfo(value)
        except zoneinfo.ZoneInfoNotFoundError:
            raise ValidationError

    def to_url(self, value):
        return value.key


class DateTimeConverter(BaseConverter):
    regex = r"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(?:\.[0-9]{3})?"

    def to_python(self, value):
        try:
            return datetime.datetime.fromisoformat(value)
        except ValueError:
            raise ValidationError

    def to_url(self, value):
        return value.isoformat()


class TimeDeltaConverter(BaseConverter):
    regex = r"[0-9]{2}:[0-9]{2}:[0-9]{2}(?:\.[0-9]{3}(?:[0-9]{3})?)?"

    def to_python(self, value):
        return datetime.timedelta(
            hours=int(value[0:2]),
            minutes=int(value[3:5]),
            seconds=int(value[6:8]),
            milliseconds=int(value[9:12]) if len(value) == 12 else 0,
            microseconds=int(value[9:15]) if len(value) == 15 else 0,
        )

    def to_url(self, value):
        return format_timedelta(value)


def format_timedelta(delta):
    assert 0 <= delta.seconds < 86400
    hours = delta.seconds // 3600
    minutes = (delta.seconds % 3600) // 60
    seconds = delta.seconds % 60
    if delta.microseconds:
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{delta.microseconds:06d}"
    else:
        return f"{hours:02d}:{minutes:02d}:{seconds:02d}"


url_map = Map(
    [
        Rule(
            "/",
            redirect_to="/clock",
        ),
        Rule(
            "/clock",
            defaults={"tzinfo": datetime.timezone.utc},
            endpoint=clock,
        ),
        Rule(
            "/clock/<tzinfo:tzinfo>",
            endpoint=clock,
        ),
        Rule(
            "/alarm/<datetime:alarm_at>/<tzinfo:tzinfo>",
            endpoint=alarm,
        ),
        Rule(
            "/timer/<timedelta:alarm_after>",
            endpoint=timer,
        ),
    ],
    converters={
        "tzinfo": ZoneInfoConverter,
        "datetime": DateTimeConverter,
        "timedelta": TimeDeltaConverter,
    },
)


async def main():
    async with route(url_map, "localhost", 8888) as server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())