File: asyncweb.py

package info (click to toggle)
pyzmq 27.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,984 kB
  • sloc: python: 15,189; ansic: 285; makefile: 169; sh: 85
file content (76 lines) | stat: -rw-r--r-- 2,044 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""Async web request example with tornado.

Requests to localhost:8888 will be relayed via 0MQ to a slow responder,
who will take 1-5 seconds to respond.  The tornado app will remain responsive
duriung this time, and when the worker replies, the web request will finish.

A '.' is printed every 100ms to demonstrate that the zmq request is not blocking
the event loop.
"""

import asyncio
import random
import sys
import threading
import time

from tornado import ioloop, web

import zmq
from zmq.eventloop.future import Context as FutureContext


def slow_responder() -> None:
    """thread for slowly responding to replies."""
    ctx = zmq.Context()
    socket = ctx.socket(zmq.ROUTER)
    socket.linger = 0
    socket.bind('tcp://127.0.0.1:5555')
    i = 0
    while True:
        frame, msg = socket.recv_multipart()
        print(f"\nworker received {msg!r}\n", end='')
        time.sleep(random.randint(1, 5))
        socket.send_multipart([frame, msg + f" to you too, #{i}".encode()])
        i += 1


def dot() -> None:
    """callback for showing that IOLoop is still responsive while we wait"""
    sys.stdout.write('.')
    sys.stdout.flush()


class TestHandler(web.RequestHandler):
    async def get(self) -> None:
        ctx = FutureContext.instance()
        s = ctx.socket(zmq.DEALER)

        s.connect('tcp://127.0.0.1:5555')
        # send request to worker
        await s.send(b"hello")

        # finish web request with worker's reply
        reply = await s.recv_string()
        print(f"\nfinishing with {reply!r}\n")
        self.write(reply)


async def setup() -> None:
    worker = threading.Thread(target=slow_responder)
    worker.daemon = True
    worker.start()

    application = web.Application([(r"/", TestHandler)])
    beat = ioloop.PeriodicCallback(dot, 100)
    beat.start()
    application.listen(8888)


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(setup())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print(' Interrupted')