File: zmq_sub_consumer.py

package info (click to toggle)
python-oslo.messaging 8.1.4-1%2Bdeb10u1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,108 kB
  • sloc: python: 17,845; sh: 454; makefile: 19
file content (128 lines) | stat: -rw-r--r-- 5,481 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#    Copyright 2015-2016 Mirantis, Inc.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import logging
import uuid

import six

from oslo_messaging._drivers.zmq_driver.server.consumers \
    import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._drivers.zmq_driver import zmq_version
from oslo_messaging._i18n import _LE, _LI

LOG = logging.getLogger(__name__)

zmq = zmq_async.import_zmq()


class SubConsumer(zmq_consumer_base.ConsumerBase):

    def __init__(self, conf, poller, server):
        super(SubConsumer, self).__init__(conf, poller, server)
        self.matchmaker = SubscriptionMatchmakerWrapper(conf,
                                                        server.matchmaker)
        self.target = server.target
        self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB,
                                           immediate=False,
                                           identity=self._generate_identity())
        self.sockets.append(self.socket)
        self.host = self.socket.handle.identity
        self._subscribe_to_topic()
        self._receive_request_versions = \
            zmq_version.get_method_versions(self, 'receive_request')
        self.connection_updater = SubscriberConnectionUpdater(
            conf, self.matchmaker, self.socket)
        self.poller.register(self.socket, self.receive_request)
        LOG.info(_LI("[%s] Run SUB consumer"), self.host)

    def _generate_identity(self):
        return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + '/') + \
            zmq_address.target_to_subscribe_filter(self.target) + \
            six.b('/' + str(uuid.uuid4()))

    def _subscribe_to_topic(self):
        topic_filter = zmq_address.target_to_subscribe_filter(self.target)
        self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
        LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
                  {"host": self.host, "filter": topic_filter})

    def _get_receive_request_version(self, version):
        receive_request_version = self._receive_request_versions.get(version)
        if receive_request_version is None:
            raise zmq_version.UnsupportedMessageVersionError(version)
        return receive_request_version

    def _receive_request_v_1_0(self, topic_filter, socket):
        message_type = int(socket.recv())
        assert message_type in zmq_names.MULTISEND_TYPES, "Fanout expected!"
        message_id = socket.recv()
        context, message = socket.recv_loaded()
        LOG.debug("[%(host)s] Received on topic %(filter)s message %(msg_id)s "
                  "(v%(msg_version)s)",
                  {'host': self.host,
                   'filter': topic_filter,
                   'msg_id': message_id,
                   'msg_version': '1.0'})
        return context, message

    def receive_request(self, socket):
        try:
            topic_filter = socket.recv()
            message_version = socket.recv_string()
            receive_request_version = \
                self._get_receive_request_version(message_version)
            context, message = receive_request_version(topic_filter, socket)
            return zmq_incoming_message.ZmqIncomingMessage(context, message)
        except (zmq.ZMQError, AssertionError, ValueError,
                zmq_version.UnsupportedMessageVersionError) as e:
            LOG.error(_LE("Receiving message failed: %s"), str(e))
            # NOTE(gdavoian): drop the left parts of a broken message
            if socket.getsockopt(zmq.RCVMORE):
                socket.recv_multipart()

    def cleanup(self):
        LOG.info(_LI("[%s] Destroy SUB consumer"), self.host)
        self.connection_updater.cleanup()
        super(SubConsumer, self).cleanup()


class SubscriptionMatchmakerWrapper(object):

    def __init__(self, conf, matchmaker):
        self.conf = conf
        self.matchmaker = matchmaker

    def get_publishers(self):
        conf_publishers = self.conf.oslo_messaging_zmq.subscribe_on
        LOG.debug("Publishers taken from configuration %s", conf_publishers)
        if conf_publishers:
            return [(publisher, None) for publisher in conf_publishers]
        return self.matchmaker.get_publishers()


class SubscriberConnectionUpdater(zmq_updater.ConnectionUpdater):

    def _update_connection(self):
        publishers = self.matchmaker.get_publishers()
        for publisher_address, router_address in publishers:
            self.socket.connect_to_host(publisher_address)
        LOG.debug("[%s] SUB consumer connected to publishers %s",
                  self.socket.handle.identity, publishers)