File: autocomplete.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (100 lines) | stat: -rw-r--r-- 3,098 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
RxPY example running a Tornado server doing search queries against Wikipedia to
populate the autocomplete dropdown in the web UI. Start using
`python autocomplete.py` and navigate your web browser to http://localhost:8080

Uses the RxPY IOLoopScheduler.
"""

import os
from asyncio import Future
from typing import Any, Dict, Union

from tornado import ioloop
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
from tornado.httputil import url_concat
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

from reactivex import operators as ops
from reactivex.scheduler.eventloop import IOLoopScheduler
from reactivex.subject import Subject

scheduler = IOLoopScheduler(ioloop.IOLoop.current())


def search_wikipedia(term: str) -> Future[HTTPResponse]:
    """Search Wikipedia for a given term"""
    url = "http://en.wikipedia.org/w/api.php"

    params: Dict[str, str] = {"action": "opensearch", "search": term, "format": "json"}
    # Must set a user agent for non-browser requests to Wikipedia
    user_agent = (
        "RxPY/3.0 (https://github.com/dbrattli/RxPY; dag@brattli.net) Tornado/4.0.1"
    )

    url = url_concat(url, params)

    http_client = AsyncHTTPClient()
    return http_client.fetch(url, method="GET", user_agent=user_agent)


class WSHandler(WebSocketHandler):
    def open(self, *args: Any):
        print("WebSocket opened")

        # A Subject is both an observable and observer, so we can both subscribe
        # to it and also feed (send) it with new values
        self.subject: Subject[Dict[str, str]] = Subject()

        # Get all distinct key up events from the input and only fire if long enough and distinct
        searcher = self.subject.pipe(
            ops.map(lambda x: x["term"]),
            ops.filter(
                lambda text: len(text) > 2
            ),  # Only if the text is longer than 2 characters
            ops.debounce(0.750),  # Pause for 750ms
            ops.distinct_until_changed(),  # Only if the value has changed
            ops.flat_map_latest(search_wikipedia),
        )

        def send_response(x: HTTPResponse) -> None:
            self.write_message(x.body)

        def on_error(ex: Exception) -> None:
            print(ex)

        searcher.subscribe(
            on_next=send_response, on_error=on_error, scheduler=scheduler
        )

    def on_message(self, message: Union[bytes, str]):
        obj = json_decode(message)
        self.subject.on_next(obj)

    def on_close(self):
        print("WebSocket closed")


class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")


def main():
    port = os.environ.get("PORT", 8080)
    app = Application(
        [
            url(r"/", MainHandler),
            (r"/ws", WSHandler),
            (r"/static/(.*)", StaticFileHandler, {"path": "."}),
        ]
    )
    print("Starting server at port: %s" % port)
    app.listen(port)
    ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()