File: consumer.t

package info (click to toggle)
libmojo-rabbitmq-client-perl 0.3.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 308 kB
  • sloc: perl: 2,165; xml: 489; makefile: 2
file content (122 lines) | stat: -rw-r--r-- 3,541 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use Test::More skip_all => "TODO";

BEGIN { use_ok 'Mojo::RabbitMQ::Client' }

sub failure {
  my ($test, $details) = @_;
  fail($test);
  diag("Details: " . $details) if $details;
  Mojo::IOLoop->stop;
}

sub handle_error {
  my $desc = $_[0] // 'Error';
  return sub {
    failure($desc, $_[1]->method_frame->reply_text);
  }
}

my $run_id        = time();
my $exchange_name = 'mrc_test_' . $run_id;
my $queue_name    = 'mrc_test_queue' . $run_id;

my $url = $ENV{MOJO_RABBITMQ_URL} || 'rabbitmq://guest:guest@127.0.0.1:5672/?exchange=' . $exchange_name . '&queue=' . $queue_name;

Mojo::IOLoop->timer(    # Global test timeout
  10 => sub {
    failure('Test timeout');
  }
);

my $client = Mojo::RabbitMQ::Client->new(url => $url);
$client->catch(handle_error('Connection or other server errors'));
$client->on(
  open => sub {
    pass('Client connected');

    my $channel = Mojo::RabbitMQ::Client::Channel->new();
    $channel->catch(handle_error("Channel error"));
    $channel->on(close => handle_error("Channel error"));
    $channel->on(
      open => sub {
        pass('Channel opened');

        my $exchange = $channel->declare_exchange(
          exchange    => $exchange_name,
          type        => 'topic',
          auto_delete => 1,
        );
        $exchange->catch(handle_error('Failed to declare exchange'));
        $exchange->on(
          success => sub {
            pass('Exchange declared');

            my $queue = $channel->declare_queue(queue => $queue_name,
              auto_delete => 1,);
            $queue->catch(handle_error('Failed to declare queue'));
            $queue->on(
              success => sub {
                pass('Queue declared');

                my $bind = $channel->bind_queue(
                  exchange    => $exchange_name,
                  queue       => $queue_name,
                  routing_key => $queue_name,
                );
                $bind->catch(handle_error('Failed to bind queue'));
                $bind->on(
                  success => sub {
                    pass('Queue bound');

                    my $publish = $channel->publish(
                      exchange    => $exchange_name,
                      routing_key => $queue_name,
                      body        => 'Test message'
                    );
                    $publish->on(success => sub {
                      pass('Message published');
                      start_consumer();
                      $client->close();
                    });
                    $publish->deliver();
                  }
                );
                $bind->deliver();
              }
            );
            $queue->deliver();
          }
        );
        $exchange->deliver();
      }
    );

    $client->open_channel($channel);
  }
);
$client->connect();

sub start_consumer {
  my $consumer = Mojo::RabbitMQ::Client->consumer(
    url      => $url,
    defaults => {
      qos      => {prefetch_count => 1},
      queue    => {auto_delete    => 1},
      consumer => {no_ack         => 0},
    }
  );

  $consumer->catch(sub { failure('Consumer: Connection or other server errors') });
  $consumer->on(connect => sub { pass('Consumer: Connected to server') });
  $consumer->on(
    'message' => sub {
      my ($consumer, $message) = @_;
      pass('Consumer: Got message');
      $consumer->close();
    }
  );
  $consumer->on(close => sub { pass('Consumer: Disconnected'); Mojo::IOLoop->stop });
  $consumer->start();
}

Mojo::IOLoop->start unless Mojo::IOLoop->is_running;