File: service_bus.cpp

package info (click to toggle)
qpid-proton 0.14.0-5
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 9,632 kB
  • ctags: 20,083
  • sloc: java: 39,624; ansic: 29,389; python: 16,581; cpp: 11,250; ruby: 6,618; perl: 2,641; php: 1,033; xml: 957; sh: 230; pascal: 52; makefile: 32
file content (329 lines) | stat: -rw-r--r-- 13,024 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

/*
 * Service Bus example.
 *
 * This is an example of using "Service Bus sessions" (not the same thing as an
 * AMQP session) to selectively retrieve messages from a queue.  The queue must
 * be configured within Service Bus to support sessions.  Service Bus uses the
 * AMQP group_id message property to associate messages with a particular
 * Service Bus session.  It uses AMQP filters to specify which session is
 * associated with a receiver.
 *
 * The mechanics for sending and receiving to other types of service bus queue
 * are broadly the same, as long as the step using the
 * receiver.source().filters() is omitted.
 *
 * Other Service Bus notes: There is no drain support, hence the need to to use
 * timeouts in this example to detect the end of the message stream.  There is
 * no browse support when setting the AMQP link distribution mode to COPY.
 * Service Bus claims to support browsing, but it is unclear how to manage that
 * with an AMQP client.  Maximum message sizes (for body and headers) vary
 * between queue types and fee tier ranging from 64KB to 1MB.  Due to the
 * distributed nature of Service Bus, queues do not automatically preserve FIFO
 * order of messages unless the user takes steps to force the message stream to
 * a single partition of the queue or creates the queue with partitioning disabled.
 *
 * This example shows use of the simpler SAS (Shared Access Signature)
 * authentication scheme where the credentials are supplied on the connection.
 * Service Bus does not actually check these credentials when setting up the
 * connection, it merely caches the SAS key and policy (AKA key name) for later
 * access authorization when creating senders and receivers.  There is a second
 * authentication scheme that allows for multiple tokens and even updating them
 * within a long-lived connection which uses special management request-response
 * queues in Service Bus.  The format of this exchange may be documented
 * somewhere but is also available by working through the CbsAsyncExample.cs
 * program in the Amqp.Net Lite project.
 *
 * The sample output for this program is:

   sent message: message 0 in service bus session "red"
   sent message: message 1 in service bus session "green"
   sent message: message 2 in service bus session "blue"
   sent message: message 3 in service bus session "red"
   sent message: message 4 in service bus session "black"
   sent message: message 5 in service bus session "blue"
   sent message: message 6 in service bus session "yellow"
receiving messages with session identifier "green" from queue ses_q1
   received message: message 1 in service bus session "green"
receiving messages with session identifier "red" from queue ses_q1
   received message: message 0 in service bus session "red"
   received message: message 3 in service bus session "red"
receiving messages with session identifier "blue" from queue ses_q1
   received message: message 2 in service bus session "blue"
   received message: message 5 in service bus session "blue"
receiving messages with session identifier "black" from queue ses_q1
   received message: message 4 in service bus session "black"
receiving messages with session identifier "yellow" from queue ses_q1
   received message: message 6 in service bus session "yellow"
Done. No more messages.

 *
 */

#include "options.hpp"

#include <proton/connection.hpp>
#include <proton/connection_options.hpp>
#include <proton/default_container.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/sender.hpp>
#include <proton/tracker.hpp>
#include <proton/delivery.hpp>
#include <proton/url.hpp>
#include <proton/source_options.hpp>

#include <iostream>
#include <sstream>

#include "fake_cpp11.hpp"

using proton::source_options;
using proton::connection_options;
using proton::sender_options;
using proton::receiver_options;

void do_next_sequence();

namespace {
void check_arg(const std::string &value, const std::string &name) {
    if (value.empty())
        throw std::runtime_error("missing argument for \"" + name + "\"");
}
}

/// Connect to Service Bus queue and retrieve messages in a particular session.
class session_receiver : public proton::messaging_handler {
  private:
    const std::string &connection_url;
    const std::string &entity;
    proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier
    int message_count;
    bool closed;
    proton::duration read_timeout;
    proton::timestamp last_read;
    proton::container *container;
    proton::receiver receiver;


    struct process_timeout_fn : public proton::void_function0 {
        session_receiver& parent;
        process_timeout_fn(session_receiver& sr) : parent(sr) {}
        void operator()() { parent.process_timeout(); }
    };

    process_timeout_fn do_process_timeout;


  public:
    session_receiver(const std::string &c, const std::string &e,
                     const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000),
                                               last_read(0), container(0), do_process_timeout(*this) {
        if (sid)
            session_identifier = std::string(sid);
        // session_identifier is now either empty/null or an AMQP string type.
        // If null, Service Bus will pick the first available message and create
        // a filter at its end with that message's session identifier.
        // Technically, an AMQP string is not a valid filter-set value unless it
        // is annotated as an AMQP described type, so this may change.

    }

    void run (proton::container &c) {
        message_count = 0;
        closed = false;
        c.connect(connection_url, connection_options().handler(*this));
        container = &c;
    }

    void on_connection_open(proton::connection &connection) OVERRIDE {
        proton::source::filter_map sb_filter_map;
        proton::symbol key("com.microsoft:session-filter");
        sb_filter_map.put(key, session_identifier);
        receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map)));

        // Start timeout processing here.  If Service Bus has no pending
        // messages, it may defer completing the receiver open until a message
        // becomes available (e.g. to be able to set the actual session
        // identifier if none was specified).
        last_read = proton::timestamp::now();
        // Call this->process_timeout after read_timeout.
        container->schedule(read_timeout, do_process_timeout);
    }

    void on_receiver_open(proton::receiver &r) OVERRIDE {
        if (closed) return; // PROTON-1264
        proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter");
        std::cout << "receiving messages with session identifier \"" << actual_session_id
                  << "\" from queue " << entity << std::endl;
        last_read = proton::timestamp::now();
    }

    void on_message(proton::delivery &, proton::message &m) OVERRIDE {
        message_count++;
        std::cout << "   received message: " << m.body() << std::endl;
        last_read = proton::timestamp::now();
    }

    void process_timeout() {
        proton::timestamp deadline = last_read + read_timeout;
        proton::timestamp now = proton::timestamp::now();
        if (now >= deadline) {
            receiver.close();
            closed = true;
            receiver.connection().close();
            if (message_count)
                do_next_sequence();
            else
                std::cout << "Done. No more messages." << std::endl;
        } else {
            proton::duration next = deadline - now;
            container->schedule(next, do_process_timeout);
        }
    }
};


/// Connect to Service Bus queue and send messages divided into different sessions.
class session_sender : public proton::messaging_handler {
  private:
    const std::string &connection_url;
    const std::string &entity;
    int msg_count;
    int total;
    int accepts;

  public:
    session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e),
                                                                 msg_count(0), total(7), accepts(0) {}

    void run(proton::container &c) {
        c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this));
    }

    void send_remaining_messages(proton::sender &s) {
        std::string gid;
        for (; msg_count < total && s.credit() > 0; msg_count++) {
            switch (msg_count) {
            case 0: gid = "red"; break;
            case 1: gid = "green"; break;
            case 2: gid = "blue"; break;
            case 3: gid = "red"; break;
            case 4: gid = "black"; break;
            case 5: gid = "blue"; break;
            case 6: gid = "yellow"; break;
            }

            std::ostringstream mbody;
            mbody << "message " << msg_count << " in service bus session \"" << gid << "\"";
            proton::message m(mbody.str());
            m.group_id(gid);  // Service Bus uses the group_id property to as the session identifier.
            s.send(m);
            std::cout << "   sent message: " << m.body() << std::endl;
        }
    }

    void on_sendable(proton::sender &s) OVERRIDE {
        send_remaining_messages(s);
    }

    void on_tracker_accept(proton::tracker &t) OVERRIDE {
        accepts++;
        if (accepts == total) {
            // upload complete
            t.sender().close();
            t.sender().connection().close();
            do_next_sequence();
        }
    }
};


/// Orchestrate the sequential actions of sending and receiving session-based messages.
class sequence : public proton::messaging_handler {
  private:
    proton::container *container;
    int sequence_no;
    session_sender snd;
    session_receiver rcv_red, rcv_green, rcv_null;

  public:
    static sequence *the_sequence;

    sequence (const std::string &c, const std::string &e) : sequence_no(0),
        snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) {
        the_sequence = this;
    }

    void on_container_start(proton::container &c) OVERRIDE {
        container = &c;
        next_sequence();
    }

    void next_sequence() {
        switch (sequence_no++) {
        // run these in order exactly once
        case 0: snd.run(*container); break;
        case 1: rcv_green.run(*container); break;
        case 2: rcv_red.run(*container); break;
        // Run this until the receiver decides there is no messages left to sequence through
        default: rcv_null.run(*container); break;
        }
    }
};

sequence *sequence::the_sequence = NULL;

void do_next_sequence() { sequence::the_sequence->next_sequence(); }


int main(int argc, char **argv) {
    std::string sb_namespace; // i.e. "foo.servicebus.windows.net"
    // Make sure the next two are urlencoded for Proton
    std::string sb_key_name;  // shared access key name for entity (AKA "Policy Name")
    std::string sb_key;       // shared access key
    std::string sb_entity;    // AKA the service bus queue.  Must enable
                              // sessions on it for this example.

    example::options opts(argc, argv);
    opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE");
    opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY");
    opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key");
    opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY");

    try {
        opts.parse();
        check_arg(sb_namespace, "namespace");
        check_arg(sb_key_name, "policy");
        check_arg(sb_key, "key");
        check_arg(sb_entity, "entity");
        std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace);

        sequence seq(connection_string, sb_entity);
        proton::default_container(seq).run();
        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}