File: autocomplete_asyncio.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (105 lines) | stat: -rw-r--r-- 3,180 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""
RxPY example running a Tornado server doing search queries against
Wikipedia to populate the autocomplete dropdown in the web UI. Start
using `python autocomplete.py` and navigate your web browser to
http://localhost:8080

Uses the RxPY AsyncIOScheduler (Python 3.4 is required)
"""

import asyncio
import os
from asyncio import Future
from typing import Dict, Union

from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httputil import url_concat
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler
from reactivex.subject import Subject


def search_wikipedia(term: str) -> Future[HTTPResponse]:
    """Search Wikipedia for a given term"""
    url = "http://en.wikipedia.org/w/api.php"

    params = {"action": "opensearch", "search": term, "format": "json"}
    # Must set a user agent for non-browser requests to Wikipedia
    user_agent = (
        "RxPY/1.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
    )

    url = url_concat(url, params)

    http_client = AsyncHTTPClient()
    return http_client.fetch(url, method="GET", user_agent=user_agent)


class WSHandler(WebSocketHandler):
    def open(self):
        scheduler = AsyncIOScheduler(asyncio.get_event_loop())

        print("WebSocket opened")

        # A Subject is both an observable and observer, so we can both subscribe
        # to it and also feed (send) it with new values
        self.subject: Subject[Dict[str, str]] = Subject()

        # Get all distinct key up events from the input and only fire if long enough and distinct
        searcher = self.subject.pipe(
            ops.map(lambda x: x["term"]),
            ops.filter(
                lambda text: len(text) > 2
            ),  # Only if the text is longer than 2 characters
            ops.debounce(0.750),  # Pause for 750ms
            ops.distinct_until_changed(),  # Only if the value has changed
            ops.flat_map_latest(search_wikipedia),
        )

        def send_response(x: HTTPResponse) -> None:
            self.write_message(x.body)

        def on_error(ex: Exception):
            print(ex)

        searcher.subscribe(
            on_next=send_response, on_error=on_error, scheduler=scheduler
        )

    def on_message(self, message: Union[bytes, str]):
        obj = json_decode(message)
        self.subject.on_next(obj)

    def on_close(self):
        print("WebSocket closed")


class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")


def main():
    AsyncIOMainLoop().make_current()

    port = os.environ.get("PORT", 8080)
    app = Application(
        [
            url(r"/", MainHandler),
            (r"/ws", WSHandler),
            (r"/static/(.*)", StaticFileHandler, {"path": "."}),
        ]
    )

    print("Starting server at port: %s" % port)
    app.listen(port)
    asyncio.get_event_loop().run_forever()


if __name__ == "__main__":
    main()