1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
"""
The automcomplete example rewritten for bottle / gevent.
- Requires besides bottle and gevent also the geventwebsocket pip package
- Instead of a future we create the inner stream for flat_map_latest manually
"""
import json
import gevent
import requests
from bottle import Bottle, abort, request
from geventwebsocket import WebSocketError
from geventwebsocket.handler import WebSocketHandler
from reactivex.scheduler.eventloop import GEventScheduler
from reactivex.subject import Subject
class WikiFinder:
tmpl = "http://en.wikipedia.org/w/api.php"
tmpl += "?action=opensearch&search=%s&format=json"
def __init__(self, term):
self.res = r = gevent.event.AsyncResult()
gevent.spawn(lambda: requests.get(self.tmpl % term).text).link(r)
def subscribe(self, on_next, on_err, on_compl):
try:
self.res.get()
on_next(self.res.value)
except Exception as ex:
on_err(ex.args)
on_compl()
app, PORT = Bottle(), 8081
scheduler = GEventScheduler(gevent)
@app.route("/ws")
def handle_websocket():
wsock = request.environ.get("wsgi.websocket")
if not wsock:
abort(400, "Expected WebSocket request.")
stream = Subject()
query = (
stream.map(lambda x: x["term"])
.filter(lambda text: len(text) > 2) # Only if text is longer than 2 characters
.debounce(0.750, scheduler=scheduler) # Pause for 750ms
.distinct_until_changed()
) # Only if the value has changed
searcher = query.flat_map_latest(lambda term: WikiFinder(term))
def send_response(x):
wsock.on_next(x)
def on_error(ex):
print(ex)
searcher.subscribe(send_response, on_error)
while True:
try:
message = wsock.receive()
# like {'term': '<current textbox val>'}
obj = json.loads(message)
stream.on_next(obj)
except WebSocketError:
break
@app.route("/static/autocomplete.js")
def get_js():
# blatantly ignoring bottle's template engine:
return open("autocomplete.js").read().replace("8080", str(PORT))
@app.route("/")
def get_index():
return open("index.html").read()
if __name__ == "__main__":
h = ("0.0.0.0", PORT)
server = gevent.pywsgi.WSGIServer(h, app, handler_class=WebSocketHandler)
server.serve_forever()
|