File: rpc.py

package info (click to toggle)
python-neutron-lib 1.29.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 6,092 kB
  • sloc: python: 20,031; sh: 98; makefile: 22
file content (358 lines) | stat: -rw-r--r-- 13,104 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# Copyright (c) 2012 OpenStack Foundation.
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import collections
import random
import time

from neutron_lib._i18n import _
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_messaging import exceptions as oslomsg_exc
from oslo_messaging.rpc import dispatcher
from oslo_messaging import serializer as om_serializer
from oslo_service import service
from oslo_utils import excutils
from osprofiler import profiler


LOG = logging.getLogger(__name__)
TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None

_DFT_EXMODS = runtime.list_package_modules(exceptions.__name__)


def init(conf, rpc_ext_mods=None):
    """Initialize the global RPC objects.

    :param conf: The oslo conf to use for initialization.
    :param rpc_ext_mods: Exception modules to expose via RPC.
    :returns: None.
    """
    global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER

    if rpc_ext_mods is None:
        rpc_ext_mods = _DFT_EXMODS
    else:
        rpc_ext_mods = list(set(rpc_ext_mods + _DFT_EXMODS))

    TRANSPORT = oslo_messaging.get_rpc_transport(
        conf, allowed_remote_exmods=rpc_ext_mods)
    NOTIFICATION_TRANSPORT = oslo_messaging.get_notification_transport(
        conf, allowed_remote_exmods=rpc_ext_mods)
    serializer = RequestContextSerializer()
    NOTIFIER = oslo_messaging.Notifier(NOTIFICATION_TRANSPORT,
                                       serializer=serializer)


def cleanup():
    """Deactivate and cleanup the global RPC objects.

    :returns: None.
    """
    global TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    if NOTIFICATION_TRANSPORT is None:
        raise AssertionError(
            _("'NOTIFICATION_TRANSPORT' must not be None"))
    if NOTIFIER is None:
        raise AssertionError(_("'NOTIFIER' must not be None"))
    TRANSPORT.cleanup()
    NOTIFICATION_TRANSPORT.cleanup()
    _BackingOffContextWrapper.reset_timeouts()
    TRANSPORT = NOTIFICATION_TRANSPORT = NOTIFIER = None


def _get_default_method_timeout():
    return TRANSPORT.conf.rpc_response_timeout


def _get_default_method_timeouts():
    return collections.defaultdict(_get_default_method_timeout)


def _get_rpc_response_max_timeout():
    return TRANSPORT.conf.rpc_response_max_timeout


class _ContextWrapper(object):
    def __init__(self, original_context):
        self._original_context = original_context

    def __getattr__(self, name):
        return getattr(self._original_context, name)

    def cast(self, ctxt, method, **kwargs):
        try:
            self._original_context.cast(ctxt, method, **kwargs)
        except oslomsg_exc.MessageDeliveryFailure as e:
            LOG.debug("Ignored exception during cast: %s", str(e))


class _BackingOffContextWrapper(_ContextWrapper):
    """Wraps oslo messaging contexts to set the timeout for calls.

    This intercepts RPC calls and sets the timeout value to the globally
    adapting value for each method. An oslo messaging timeout results in
    a doubling of the timeout value for the method on which it timed out.
    There currently is no logic to reduce the timeout since busy Neutron
    servers are more frequently the cause of timeouts rather than lost
    messages.
    """
    _METHOD_TIMEOUTS = _get_default_method_timeouts()
    _max_timeout = None

    @classmethod
    def reset_timeouts(cls):
        # restore the original default timeout factory
        cls._METHOD_TIMEOUTS = _get_default_method_timeouts()
        cls._max_timeout = None

    @classmethod
    def get_max_timeout(cls):
        return cls._max_timeout or _get_rpc_response_max_timeout()

    @classmethod
    def set_max_timeout(cls, max_timeout):
        if max_timeout < cls.get_max_timeout():
            cls._METHOD_TIMEOUTS = collections.defaultdict(
                lambda: max_timeout, **{
                    k: min(v, max_timeout)
                    for k, v in cls._METHOD_TIMEOUTS.items()
                })
            cls._max_timeout = max_timeout

    def call(self, ctxt, method, **kwargs):
        # two methods with the same name in different namespaces should
        # be tracked independently
        if self._original_context.target.namespace:
            scoped_method = '%s.%s' % (self._original_context.target.namespace,
                                       method)
        else:
            scoped_method = method
        # set the timeout from the global method timeout tracker for this
        # method
        self._original_context.timeout = self._METHOD_TIMEOUTS[scoped_method]
        try:
            return self._original_context.call(ctxt, method, **kwargs)
        except oslo_messaging.MessagingTimeout:
            with excutils.save_and_reraise_exception():
                wait = random.uniform(
                    0,
                    min(self._METHOD_TIMEOUTS[scoped_method],
                        TRANSPORT.conf.rpc_response_timeout)
                )
                LOG.error("Timeout in RPC method %(method)s. Waiting for "
                          "%(wait)s seconds before next attempt. If the "
                          "server is not down, consider increasing the "
                          "rpc_response_timeout option as Neutron "
                          "server(s) may be overloaded and unable to "
                          "respond quickly enough.",
                          {'wait': int(round(wait)), 'method': scoped_method})
                new_timeout = min(
                    self._original_context.timeout * 2, self.get_max_timeout())
                if new_timeout > self._METHOD_TIMEOUTS[scoped_method]:
                    LOG.warning("Increasing timeout for %(method)s calls "
                                "to %(new)s seconds. Restart the agent to "
                                "restore it to the default value.",
                                {'method': scoped_method, 'new': new_timeout})
                    self._METHOD_TIMEOUTS[scoped_method] = new_timeout
                time.sleep(wait)


class BackingOffClient(oslo_messaging.RPCClient):
    """An oslo messaging RPC Client that implements a timeout backoff.

    This has all of the same interfaces as oslo_messaging.RPCClient but
    if the timeout parameter is not specified, the _BackingOffContextWrapper
    returned will track when call timeout exceptions occur and exponentially
    increase the timeout for the given call method.
    """
    def prepare(self, *args, **kwargs):
        ctx = super(BackingOffClient, self).prepare(*args, **kwargs)
        # don't back off contexts that explicitly set a timeout
        if 'timeout' in kwargs:
            return _ContextWrapper(ctx)
        return _BackingOffContextWrapper(ctx)

    @staticmethod
    def set_max_timeout(max_timeout):
        '''Set RPC timeout ceiling for all backing-off RPC clients.'''
        _BackingOffContextWrapper.set_max_timeout(max_timeout)


def get_client(target, version_cap=None, serializer=None):
    """Get an RPC client for the said target.

    The init() function must be called prior to calling this.
    :param target: The RPC target for the client.
    :param version_cap: The optional version cap for the RPC client.
    :param serializer: The optional serializer to use for the RPC client.
    :returns: A new RPC client.
    """
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    serializer = RequestContextSerializer(serializer)
    return BackingOffClient(TRANSPORT,
                            target,
                            version_cap=version_cap,
                            serializer=serializer)


def get_server(target, endpoints, serializer=None):
    """Get a new RPC server reference.

    :param target: The target for the new RPC server.
    :param endpoints: The endpoints for the RPC server.
    :param serializer: The optional serialize to use for the RPC server.
    :returns: A new RPC server reference.
    """
    if TRANSPORT is None:
        raise AssertionError(_("'TRANSPORT' must not be None"))
    serializer = RequestContextSerializer(serializer)
    access_policy = dispatcher.DefaultRPCAccessPolicy
    return oslo_messaging.get_rpc_server(TRANSPORT, target, endpoints,
                                         'eventlet', serializer,
                                         access_policy=access_policy)


def get_notifier(service=None, host=None, publisher_id=None):
    """Get a new notifier reference.

    :param service: The optional service for the notifier.
    :param host: The optional host for the notifier. If not given the host
        will be taken from the global CONF.
    :param publisher_id: The optional publisher ID for the notifer.
    :returns: A new RPC notifier reference.
    """
    if NOTIFIER is None:
        raise AssertionError(_("'NOTIFIER' must not be None"))
    if not publisher_id:
        publisher_id = "%s.%s" % (service, host or cfg.CONF.host)
    return NOTIFIER.prepare(publisher_id=publisher_id)


class RequestContextSerializer(om_serializer.Serializer):
    """Convert RPC common context into Neutron Context."""
    def __init__(self, base=None):
        super(RequestContextSerializer, self).__init__()
        self._base = base

    def serialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.serialize_entity(ctxt, entity)

    def deserialize_entity(self, ctxt, entity):
        if not self._base:
            return entity
        return self._base.deserialize_entity(ctxt, entity)

    def serialize_context(self, ctxt):
        _context = ctxt.to_dict()
        prof = profiler.get()
        if prof:
            trace_info = {
                "hmac_key": prof.hmac_key,
                "base_id": prof.get_base_id(),
                "parent_id": prof.get_id()
            }
            _context['trace_info'] = trace_info
        return _context

    def deserialize_context(self, ctxt):
        rpc_ctxt_dict = ctxt.copy()
        trace_info = rpc_ctxt_dict.pop("trace_info", None)
        if trace_info:
            profiler.init(**trace_info)
        return context.Context.from_dict(rpc_ctxt_dict)


@profiler.trace_cls("rpc")
class Service(service.Service):
    """Service object for binaries running on hosts.

    A service enables rpc by listening to queues based on topic and host.
    """
    def __init__(self, host, topic, manager=None, serializer=None):
        super(Service, self).__init__()
        self.host = host
        self.topic = topic
        self.serializer = serializer
        if manager is None:
            self.manager = self
        else:
            self.manager = manager

    def start(self):
        super(Service, self).start()

        self.conn = Connection()
        LOG.debug("Creating Consumer connection for Service %s",
                  self.topic)

        endpoints = [self.manager]

        self.conn.create_consumer(self.topic, endpoints)

        # Hook to allow the manager to do other initializations after
        # the rpc connection is created.
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
            self.manager.initialize_service_hook(self)

        # Consume from all consumers in threads
        self.conn.consume_in_threads()

    def stop(self):
        # Try to shut the connection down, but if we get any sort of
        # errors, go ahead and ignore them.. as we're shutting down anyway
        try:
            self.conn.close()
        except Exception:  # nosec
            pass
        super(Service, self).stop()


class Connection(object):
    """A utility class that manages a collection of RPC servers."""

    def __init__(self):
        super(Connection, self).__init__()
        self.servers = []

    def create_consumer(self, topic, endpoints, fanout=False):
        target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=fanout)
        server = get_server(target, endpoints)
        self.servers.append(server)

    def consume_in_threads(self):
        for server in self.servers:
            server.start()
        return self.servers

    def close(self):
        for server in self.servers:
            server.stop()
        for server in self.servers:
            server.wait()