File: simple.py

package info (click to toggle)
python-stompy 0.2.9-1
  • links: PTS
  • area: main
  • in suites: jessie, jessie-kfreebsd, wheezy
  • size: 100 kB
  • ctags: 78
  • sloc: python: 361; makefile: 2
file content (168 lines) | stat: -rw-r--r-- 5,699 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from stompy.stomp import Stomp
from Queue import Empty
from uuid import uuid4


class TransactionError(Exception):
    """Transaction related error."""


class Client(object):
    """Simple STOMP client.

    :keyword host: Hostname of the server to connect to (default:
        ``localhost``)
    :keyword port: Port of the server to connect to (default: ``61613``)

    Example

        >>> from stompy.simple import Client
        >>> stomp = Client()
        >>> stomp.connect()
        >>> stomp.put("The quick brown fox...", destination="/queue/test")
        >>> stomp.subscribe("/queue/test")
        >>> message = stomp.get_nowait()
        >>> message.body
        'The quick brown fox...'
        >>> stomp.ack(message)
        >>> stomp.unsubscribe("/queue/test")
        >>> stomp.disconnect()

    """
    Empty = Empty

    def __init__(self, host="localhost", port=61613):
        self.stomp = Stomp(host, port)
        self._current_transaction = None

    def get(self, block=True, callback=None):
        """Get message.

        :keyword block: Block if necessary until an item is available.
            If this is ``False``, return an item if one is immediately
            available, else raise the :exc:`Empty` exception.

        :keyword callback: Optional function to execute when message recieved.

        :raises Empty: If ``block`` is off and no message was receied.

        """
        frame = self.stomp.receive_frame(nonblocking=not block, callback=callback)
        if frame is None and not block:
            raise self.Empty()
        return frame

    def get_nowait(self):
        """Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the :exc:`Empty` exception.

        See :meth:`get`.

        """
        return self.get(block=False)

    def put(self, item, destination, persistent=True, conf=None):
        """Put an item into the queue.

        :param item: Body of the message.
        :param destination: Destination queue.
        :keyword persistent: Is message persistent? (store on disk).
        :keyword conf: Extra headers to send to the broker.

        :returns: The resulting :class:`stompy.frame.Frame` instance.

        """
        persistent = "true" if persistent else "false"
        conf = self._make_conf(conf, body=item, destination=destination,
                               persistent=persistent)

        return self.stomp.send(conf)

    def connect(self, username=None, password=None, clientid=None):
        """Connect to the broker.

        :keyword username: Username for connection
        :keyword password: Password for connection
        :keyword clientid: Client identification for persistent connections

        :raises :exc:`stompy.stomp.ConnectionError`:
            if the connection was unsuccessful.
        :raises :exc:`stompy.stomp.ConnectionTimeoutError`:
            if the connection timed out.

        """
        self.stomp.connect(username=username, password=password, clientid=clientid)

    def disconnect(self):
        """Disconnect from the broker."""
        self.stomp.disconnect()

    def subscribe(self, destination, ack="auto", conf=None):
        """Subscribe to topic/queue.

        :param destination: The destination queue/topic to subscribe to.
        :keyword ack: How to handle acknowledgment, either
            ``auto`` - ack is handled by the server automatically, or
            ``client`` - ack is handled by the client itself by calling
            :meth:`ack`.
        :keyword conf: Additional headers to send with the subscribe request.

        """
        conf = self._make_conf(conf, destination=destination, ack=ack)
        return self.stomp.subscribe(conf)

    def unsubscribe(self, destination, conf=None):
        """Unsubscribe from topic/queue previously subscribed to.

        :param destination: The destination queue/topic to unsubscribe from.
        :keyword conf: Additional headers to send with the unsubscribe
            request.

        """
        conf = self._make_conf(conf, destination=destination)
        return self.stomp.unsubscribe(conf)

    def begin(self, transaction):
        """Begin transaction.

        Every :meth:`ack` and :meth:`send` will be affected by this
        transaction and won't be real until a :meth:`commit` is issued.
        To roll-back any changes since the transaction started use
        :meth:`abort`.

        """
        if self._current_transaction:
            raise TransactionError(
                "Already in transaction. Please commit or abort first!")
        self._current_transaction = str(uuid4())
        return self.stomp.begin({"transaction": self._current_transaction})

    def commit(self, transaction):
        """Commit current transaction."""
        if not self._current_transaction:
            raise TransactionError("Not in transaction")
        self.stomp.commit({"transaction": self._current_transaction})
        self._current_transaction = None

    def abort(self):
        """Roll-back current transaction."""
        if not self._current_transaction:
            raise TransactionError("Not in transaction")
        self.stomp.abort({"transaction": self._current_transaction})
        self._current_transaction = None

    def ack(self, frame):
        """Acknowledge message.

        :param frame: The message to acknowledge.

        """
        return self.stomp.ack(frame)

    def _make_conf(self, conf, **kwargs):
        kwargs.update(dict(conf or {}))
        if self._current_transaction:
            conf["transaction"] = self._current_transaction
        return kwargs