File: controller.py

package info (click to toggle)
pyzmq 27.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,984 kB
  • sloc: python: 15,189; ansic: 285; makefile: 169; sh: 85
file content (99 lines) | stat: -rw-r--r-- 3,248 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -----------------------------------------------------------------------------
#  Copyright (c) 2010 Justin Riley
#
#  Distributed under the terms of the New BSD License.  The full license is in
#  the file LICENSE.BSD, distributed as part of this software.
# -----------------------------------------------------------------------------

import json
from typing import Any, Dict, Optional, Union

import pymongo
from bson import json_util

import zmq


class MongoZMQ:
    """
    ZMQ server that adds/fetches documents (ie dictionaries) to a MongoDB.

    NOTE: mongod must be started before using this class
    """

    def __init__(
        self, db_name: str, table_name: str, bind_addr: str = "tcp://127.0.0.1:5000"
    ):
        """
        bind_addr: address to bind zmq socket on
        db_name: name of database to write to (created if doesn't exist)
        table_name: name of mongodb 'table' in the db to write to (created if doesn't exist)
        """
        self._bind_addr = bind_addr
        self._db_name = db_name
        self._table_name = table_name
        self._conn: pymongo.MongoClient = pymongo.MongoClient()
        self._db = self._conn[self._db_name]
        self._table = self._db[self._table_name]

    def _doc_to_json(self, doc: Any) -> str:
        return json.dumps(doc, default=json_util.default)

    def add_document(self, doc: Dict) -> Optional[str]:
        """
        Inserts a document (dictionary) into mongo database table
        """
        print(f'adding document {doc}')
        try:
            self._table.insert(doc)
        except Exception as e:
            return f'Error: {e}'
        return None

    def get_document_by_keys(self, keys: Dict[str, Any]) -> Union[Dict, str, None]:
        """
        Attempts to return a single document from database table that matches
        each key/value in keys dictionary.
        """
        print(f'attempting to retrieve document using keys: {keys}')
        try:
            return self._table.find_one(keys)
        except Exception as e:
            return f'Error: {e}'

    def start(self) -> None:
        context = zmq.Context()
        socket = context.socket(zmq.ROUTER)
        socket.bind(self._bind_addr)
        while True:
            msg = socket.recv_multipart()
            print("Received msg: ", msg)
            if len(msg) != 3:
                error_msg = f'invalid message received: {msg}'
                print(error_msg)
                reply = [msg[0], error_msg]
                socket.send_multipart(reply)
                continue
            id = msg[0]
            operation = msg[1]
            contents = json.loads(msg[2])
            # always send back the id with ROUTER
            reply = [id]
            if operation == 'add':
                self.add_document(contents)
                reply.append("success")
            elif operation == 'get':
                doc = self.get_document_by_keys(contents)
                json_doc = self._doc_to_json(doc)
                reply.append(json_doc)
            else:
                print('unknown request')
            socket.send_multipart(reply)


def main() -> None:
    MongoZMQ('ipcontroller', 'jobs').start()


if __name__ == "__main__":
    main()