1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
#!/usr/bin/env python
import asyncio
import datetime
import time
import zoneinfo
from websockets.asyncio.router import route
from websockets.exceptions import ConnectionClosed
from werkzeug.routing import BaseConverter, Map, Rule, ValidationError
async def clock(websocket, tzinfo):
"""Send the current time in the given timezone every second."""
loop = asyncio.get_running_loop()
loop_offset = (loop.time() - time.time()) % 1
try:
while True:
# Sleep until the next second according to the wall clock.
await asyncio.sleep(1 - (loop.time() - loop_offset) % 1)
now = datetime.datetime.now(tzinfo).replace(microsecond=0)
await websocket.send(now.isoformat())
except ConnectionClosed:
return
async def alarm(websocket, alarm_at, tzinfo):
"""Send the alarm time in the given timezone when it is reached."""
alarm_at = alarm_at.replace(tzinfo=tzinfo)
now = datetime.datetime.now(tz=datetime.timezone.utc)
try:
async with asyncio.timeout((alarm_at - now).total_seconds()):
await websocket.wait_closed()
except asyncio.TimeoutError:
try:
await websocket.send(alarm_at.isoformat())
except ConnectionClosed:
return
async def timer(websocket, alarm_after):
"""Send the remaining time until the alarm time every second."""
alarm_at = datetime.datetime.now(tz=datetime.timezone.utc) + alarm_after
loop = asyncio.get_running_loop()
loop_offset = (loop.time() - time.time() + alarm_at.timestamp()) % 1
try:
while alarm_after.total_seconds() > 0:
# Sleep until the next second as a delta to the alarm time.
await asyncio.sleep(1 - (loop.time() - loop_offset) % 1)
alarm_after = alarm_at - datetime.datetime.now(tz=datetime.timezone.utc)
# Round up to the next second.
alarm_after += datetime.timedelta(
seconds=1,
microseconds=-alarm_after.microseconds,
)
await websocket.send(format_timedelta(alarm_after))
except ConnectionClosed:
return
class ZoneInfoConverter(BaseConverter):
regex = r"[A-Za-z0-9_/+-]+"
def to_python(self, value):
try:
return zoneinfo.ZoneInfo(value)
except zoneinfo.ZoneInfoNotFoundError:
raise ValidationError
def to_url(self, value):
return value.key
class DateTimeConverter(BaseConverter):
regex = r"[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}(?:\.[0-9]{3})?"
def to_python(self, value):
try:
return datetime.datetime.fromisoformat(value)
except ValueError:
raise ValidationError
def to_url(self, value):
return value.isoformat()
class TimeDeltaConverter(BaseConverter):
regex = r"[0-9]{2}:[0-9]{2}:[0-9]{2}(?:\.[0-9]{3}(?:[0-9]{3})?)?"
def to_python(self, value):
return datetime.timedelta(
hours=int(value[0:2]),
minutes=int(value[3:5]),
seconds=int(value[6:8]),
milliseconds=int(value[9:12]) if len(value) == 12 else 0,
microseconds=int(value[9:15]) if len(value) == 15 else 0,
)
def to_url(self, value):
return format_timedelta(value)
def format_timedelta(delta):
assert 0 <= delta.seconds < 86400
hours = delta.seconds // 3600
minutes = (delta.seconds % 3600) // 60
seconds = delta.seconds % 60
if delta.microseconds:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}.{delta.microseconds:06d}"
else:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
url_map = Map(
[
Rule(
"/",
redirect_to="/clock",
),
Rule(
"/clock",
defaults={"tzinfo": datetime.timezone.utc},
endpoint=clock,
),
Rule(
"/clock/<tzinfo:tzinfo>",
endpoint=clock,
),
Rule(
"/alarm/<datetime:alarm_at>/<tzinfo:tzinfo>",
endpoint=alarm,
),
Rule(
"/timer/<timedelta:alarm_after>",
endpoint=timer,
),
],
converters={
"tzinfo": ZoneInfoConverter,
"datetime": DateTimeConverter,
"timedelta": TimeDeltaConverter,
},
)
async def main():
async with route(url_map, "localhost", 8888) as server:
await server.serve_forever()
if __name__ == "__main__":
asyncio.run(main())
|