File: ws.py

package info (click to toggle)
python-zaqarclient 4.1.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 832 kB
  • sloc: python: 4,417; makefile: 18; sh: 2
file content (138 lines) | stat: -rw-r--r-- 4,919 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#   Copyright 2016 Red Hat, Inc.
#
#   Licensed under the Apache License, Version 2.0 (the "License"); you may
#   not use this file except in compliance with the License. You may obtain
#   a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#   License for the specific language governing permissions and limitations
#   under the License.
#
import json

from oslo_log import log as logging
from oslo_utils import importutils
from oslo_utils import uuidutils

from zaqarclient.transport import base
from zaqarclient.transport import request
from zaqarclient.transport import response

websocket = importutils.try_import('websocket')

LOG = logging.getLogger(__name__)


class WebsocketTransport(base.Transport):

    """Zaqar websocket transport.

    *NOTE:* Zaqar's websocket interface does not yet appear to work
    well with parameters. Until it does the websocket transport may not
    integrate with all of zaqarclients higherlevel request. Even so...
    websockets today is still quite usable and use of the transport
    via lower level API's in zaqarclient work quite nicely. Example:

       conf = {
            'auth_opts': {
                'backend': 'keystone',
                'options': {
                    'os_auth_token': ks.auth_token,
                    'os_project_id': CONF.zaqar.project_id
                }
            }
        }

        endpoint = 'ws://172.19.0.3:9000'

        with transport.get_transport_for(endpoint, options=conf) as ws:
            req = request.Request(endpoint, 'queue_create',
                                  content=json.dumps({'queue_name': 'foo'}))
            resp = ws.send(req)

    """
    def __init__(self, options):
        super(WebsocketTransport, self).__init__(options)
        option = options['auth_opts']['options']
        # TODO(wangxiyuan): To keep backwards compatibility, we leave
        # "os_project_id" here. Remove it in the next release.
        self._project_id = option.get('os_project_id',
                                      option.get('project_id'))
        self._token = options['auth_opts']['options']['os_auth_token']
        self._websocket_client_id = None
        self._ws = None

    def _init_client(self, endpoint):
        """Initialize a websocket transport client.

        :param endpoint: The websocket endpoint. Example: ws://127.0.0.1:9000/.
                         Required.
        :type endpoint: string
        """
        self._websocket_client_id = uuidutils.generate_uuid()

        LOG.debug('Instantiating messaging websocket client: %s', endpoint)
        self._ws = self._create_connection(endpoint)

        auth_req = request.Request(endpoint, 'authenticate',
                                   headers={'X-Auth-Token': self._token})
        self.send(auth_req)

    def _create_connection(self, endpoint):
        return websocket.create_connection(endpoint)

    def send(self, request):
        if not self._ws:
            self._init_client(request.endpoint)

        headers = request.headers.copy()
        headers.update({
            'Client-ID': self._websocket_client_id,
            'X-Project-ID': self._project_id
        })

        msg = {'action': request.operation, 'headers': headers}
        if request.content:
            msg['body'] = json.loads(request.content)
        # NOTE(dprince): Zaqar websockets do not yet seem to support params?!
        # Users of this protocol will need to send everything in the body.
        if request.params:
            LOG.warning('Websocket transport does not yet support params.')
        self._ws.send(json.dumps(msg))
        ret = self.recv()

        resp = response.Response(request, json.dumps(ret.get('body', '')),
                                 headers=ret['headers'],
                                 status_code=int(ret['headers']['status']))

        if resp.status_code in self.http_to_zaqar:
            kwargs = {}
            try:
                error_body = json.loads(resp.content)
                kwargs['title'] = 'Websocket Transport Error'
                kwargs['description'] = error_body['error']
            except Exception:
                kwargs['text'] = resp.content
            raise self.http_to_zaqar[resp.status_code](**kwargs)

        return resp

    def recv(self):
        return json.loads(self._ws.recv())

    def cleanup(self):
        if self._ws:
            self._ws.close()
            self._ws = None

    def __enter__(self):
        """Return self to allow usage as a context manager"""
        return self

    def __exit__(self, *exc):
        """Call cleanup when exiting the context manager"""
        self.cleanup()