File: tornado_change_stream_example.py

package info (click to toggle)
python-motor 3.7.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,572 kB
  • sloc: python: 12,252; javascript: 137; makefile: 74; sh: 8
file content (117 lines) | stat: -rw-r--r-- 3,445 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import logging
import os
import sys
from base64 import urlsafe_b64encode
from pprint import pformat

import tornado.escape
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.websocket
from bson import json_util  # Installed with PyMongo.
from tornado.options import define, options

from motor.motor_tornado import MotorClient

define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="reload on source changes")
define("mongo", default="mongodb://localhost", help="MongoDB URI")
define("ns", default="test.test", help="database and collection name")


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [(r"/", MainHandler), (r"/socket", ChangesHandler)]

        templates = os.path.join(os.path.dirname(__file__), "tornado_change_stream_templates")

        super().__init__(
            handlers, template_path=templates, template_whitespace="all", debug=options.debug
        )


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("index.html", changes=ChangesHandler.cache)


class ChangesHandler(tornado.websocket.WebSocketHandler):
    waiters = set()
    cache = []
    cache_size = 5

    def open(self):
        ChangesHandler.waiters.add(self)

    def on_close(self):
        ChangesHandler.waiters.remove(self)

    @classmethod
    def update_cache(cls, change):
        cls.cache.append(change)
        if len(cls.cache) > cls.cache_size:
            cls.cache = cls.cache[-cls.cache_size :]

    @classmethod
    def send_change(cls, change):
        change_json = json_util.dumps(change)
        for waiter in cls.waiters:
            try:
                waiter.write_message(change_json)
            except Exception as exc:
                logging.exception(exc)

    @classmethod
    def on_change(cls, change):
        logging.info("got change of type '%s'", change.get("operationType"))

        # Each change notification has a binary _id. Use it to make an HTML
        # element id, then remove it.
        data = change["_id"]["_data"]
        if not isinstance(data, bytes):
            data = data.encode("utf-8")
        html_id = urlsafe_b64encode(data).decode().rstrip("=")
        change.pop("_id")
        change_pre = tornado.escape.xhtml_escape(pformat(change))
        change["html"] = f'<div id="change-{html_id}"><pre>{change_pre}</pre></div>'
        change["html_id"] = html_id
        ChangesHandler.send_change(change)
        ChangesHandler.update_cache(change)


change_stream = None


async def watch(collection):
    global change_stream

    async with collection.watch() as change_stream:
        async for change in change_stream:
            ChangesHandler.on_change(change)


def main():
    tornado.options.parse_command_line()
    if "." not in options.ns:
        sys.stderr.write(f'Invalid ns "{options.ns}", must contain a "."')
        sys.exit(1)

    db_name, collection_name = options.ns.split(".", 1)
    client = MotorClient(options.mongo)
    collection = client[db_name][collection_name]

    app = Application()
    app.listen(options.port)
    loop = tornado.ioloop.IOLoop.current()

    # Start watching collection for changes.
    try:
        loop.run_sync(lambda: watch(collection))
    except KeyboardInterrupt:
        if change_stream:
            loop.run_sync(change_stream.close)


if __name__ == "__main__":
    main()