File: konamicode.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (67 lines) | stat: -rw-r--r-- 2,048 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
from typing import Dict, Union

from tornado import ioloop
from tornado.escape import json_decode
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler

from reactivex import operators as ops
from reactivex.subject import Subject

UP, DOWN, LEFT, RIGHT, B, A = 38, 40, 37, 39, 66, 65
codes = [UP, UP, DOWN, DOWN, LEFT, RIGHT, LEFT, RIGHT, B, A]


class WSHandler(WebSocketHandler):
    def open(self):
        print("WebSocket opened")

        # A Subject is both an observable and observer, so we can both subscribe
        # to it and also feed (on_next) it with new values
        self.subject: Subject[Dict[str, int]] = Subject()

        # Now we take on our magic glasses and project the stream of bytes into
        # a ...
        query = self.subject.pipe(
            # 1. stream of keycodes
            ops.map(lambda obj: obj["keycode"]),
            # 2. stream of windows (10 ints long)
            ops.window_with_count(10, 1),
            # 3. stream of booleans, True or False
            ops.flat_map(lambda win: win.pipe(ops.sequence_equal(codes))),
            # 4. stream of Trues
            ops.filter(lambda equal: equal),
        )
        # 4. we then subscribe to the Trues, and signal Konami! if we see any
        query.subscribe(on_next=lambda x: self.write_message("Konami!"))

    def on_message(self, message: Union[str, bytes]):
        obj = json_decode(message)
        self.subject.on_next(obj)

    def on_close(self):
        print("WebSocket closed")


class MainHandler(RequestHandler):
    def get(self):
        self.render("index.html")


def main():
    port = os.environ.get("PORT", 8080)
    app = Application(
        [
            url(r"/", MainHandler),
            (r"/ws", WSHandler),
            (r"/static/(.*)", StaticFileHandler, {"path": "."}),
        ]
    )
    print("Starting server at port: %s" % port)
    app.listen(int(port))
    ioloop.IOLoop.current().start()


if __name__ == "__main__":
    main()