File: frame.py

package info (click to toggle)
python-stompy 0.2.5-1
  • links: PTS
  • area: main
  • in suites: squeeze
  • size: 100 kB
  • ctags: 77
  • sloc: python: 356; makefile: 2
file content (283 lines) | stat: -rw-r--r-- 8,721 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import socket
import random
from pprint import pformat
from errno import EAGAIN
from Queue import Queue
from Queue import Empty as QueueEmpty


class UnknownBrokerResponseError(Exception):
    """An unexpected response was received from the broker."""


class BrokerErrorResponse(Exception):
    """Received error from the broker."""


class IntermediateMessageQueue(object):
    """Internal message queue that holds messages received by the server.

    This to make sure a message isn't received instead of a command response
    after issuing a receipt request.

    """

    def __init__(self):
        self._queue = Queue()

    def put(self, frame):
        """Put a new frame onto the message queue.
        :param frame: A :class:`Frame` instance.

        """
        if "destination" not in frame.headers:
            return
        self._queue.put(frame)

    def get(self, frame, nb=False):
        """Get a new frame from the message queue.
        If no frame is available it try to get the next frame
        from the socket.

        :param frame: A :class:`Frame` instance.
        :keyword nb: Non-blocking.

        """
        try:
            return self._queue.get_nowait()
        except QueueEmpty:
            return frame.parse_frame(nb=nb)


class Frame(object):
    """Build and manage a STOMP Frame.

    :keyword sock: An open socket to the STOMP server.

    """

    def __init__(self, sock=None):
        self.command = None
        self.headers = {}
        self.body = None
        self.session = None
        self.my_name = socket.gethostbyname(socket.gethostname())
        self.sock = sock
        self.iqueue = IntermediateMessageQueue()
        self.rqueue = Queue()

    def connect(self, sock, username=None, password=None):
        """Connect to the STOMP server and get the session id.

        :param sock: Socket object from stompy.stomp.Stomp.
        :keyword username: Username for connection.
        :keyword password: Password for connection.

        """
        self.sock = sock
        if username and password:
            frame = self.build_frame({"command": "CONNECT",
                                      "headers": {'login': username,
                                                  'passcode': password}})
        else:
            frame = self.build_frame({"command": "CONNECT", "headers": {}})

        self.send_frame(frame.as_string())

        # Get session from the next reply from the server.
        next_frame = self.get_reply()
        self.session = next_frame.headers

    def build_frame(self, args, want_receipt=False):
        """Build a frame based on a :class:`dict` of arguments.

        :param args: A :class:`dict` of arguments for the frame.

        :keyword want_receipt: Optional argument to get a receipt from
            the sever that the frame was received.

        Example

            >>> frame = frameobj.build_frame({"command": 'CONNECT',
                                              "headers": {},
                                              want_receipt=True)
        """
        self.command = args.get('command')
        self.headers = args.get('headers')
        self.body = args.get('body')
        if want_receipt:
            receipt_stamp = str(random.randint(0, 10000000))
            self.headers["receipt"] = "%s-%s" % (
                    self.session.get("session"), receipt_stamp)
        return self

    def as_string(self):
        """Raw string representation of this frame
        Suitable for passing over a socket to the STOMP server.

        Example

            >>> stomp.send(frameobj.as_string())

        """
        command = self.command
        headers = self.headers
        body = self.body

        bytes_message = False
        if 'bytes_message' in headers:
            bytes_message = True
            del headers['bytes_message']
            headers['content-length'] = len(body)
        headers['x-client'] = self.my_name

        # Convert and append any existing headers to a string as the
        # protocol describes.
        headerparts = ("%s:%s\n" % (key, value)
                            for key, value in headers.iteritems())

        # Frame is Command + Header + EOF marker.
        frame = "%s\n%s\n%s\x00" % (command, "".join(headerparts), body)

        return frame

    def get_message(self, nb=False):
        """Get next message frame.

        :keyword nb: Non-blocking: If this is set and there is no
            messages currently waiting, this functions returns ``None``
            instead of waiting for more data.

        """
        while True:
            frame = self.iqueue.get(self, nb=nb)
            if not frame and nb:
                return None
            if frame.command == "MESSAGE":
                return frame
            else:
                self.rqueue.put(frame)

    def get_reply(self, nb=False):
        """Get command reply frame.

        :keyword nb: Non-blocking: If this is set and there is no
            messages currently waiting, this functions returns ``None``
            instead of waiting for more data.

        """
        while True:
            try:
                return self.rqueue.get_nowait()
            except QueueEmpty:
                frame = self.parse_frame(nb=nb)
                if not frame and nb:
                    return None
                if frame.command == "MESSAGE":
                    self.iqueue.put(frame)
                else:
                    self.rqueue.put(frame)

    def parse_frame(self, nb=False):
        """Parse data from socket

        :keyword nb: Non-blocking: If this is set and there is no
            messages currently waiting, this functions returns ``None``
            instead of waiting for more data.

        Example

            >>> frameobj.parse_frame()

        """
        line = self._getline(nb=nb)
        if not line:
            return

        command = self.parse_command(line)
        line = line[len(command)+1:]
        headers_str, _, body = line.partition("\n\n")
        if not headers_str:
            raise UnknownBrokerResponseError(
                    "Received: (%s)" % line)
        headers = self.parse_headers(headers_str)

        if 'content-length' in headers:
            headers['bytes_message'] = True

        if command == 'ERROR':
            raise BrokerErrorResponse(
                    "Broker Returned Error: %s" % body)

        frame = Frame(self.sock)
        return frame.build_frame({'command': command,
                                  'headers': headers,
                                  'body': body})

    def parse_command(self, command_str):
        """Parse command received from the server.

        :param command_str: String to parse command from

        """
        command = command_str.split('\n', 1)[0]
        return command

    def parse_headers(self, headers_str):
        """Parse headers received from the servers and convert
        to a :class:`dict`.i

        :param headers_str: String to parse headers from

        """
        # george:constanza\nelaine:benes
        # -> {"george": "constanza", "elaine": "benes"}
        return dict(line.split(":", 1) for line in headers_str.split("\n"))

    def send_frame(self, frame):
        """Send frame to server, get receipt if needed.

        :param frame: :class:`Frame` instance to pass across the socket

        """
        self.sock.sendall(frame)

        if 'receipt' in self.headers:
            return self.get_reply()

    def _getline(self, nb=False):
        """Get a single line from socket

        :keyword nb: Non-blocking: If this is set, and there are no
            messages to receive, this function returns ``None``.

        """
        self.sock.setblocking(not nb)
        try:
            buffer = ''
            partial = ''
            while not buffer.endswith('\x00'):
                try:
                    partial = self.sock.recv(1)
                except socket.error, exc:
                    if exc[0] == EAGAIN:
                        if not buffer or buffer == '\n':
                            return None
                        continue
                buffer += partial
        finally:
            self.sock.setblocking(nb)

        # ** Nasty Alert **
        # There may be a left over newline
        # RabbitMQ doesn't have a newline after \x00
        # ActiveMQ does.  This is a hack around that.
        # http://stomp.codehaus.org/Protocol mentions
        # nothing about a newline following the NULL (^@)
        if buffer[:1] == '\n':
            return buffer[1:-1]

        return buffer[:-1]

    def __repr__(self):
        return "<Frame %s>" % pformat(self.headers)