File: LocalQueue.pm

package info (click to toggle)
libmojo-rabbitmq-client-perl 0.3.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 308 kB
  • sloc: perl: 2,165; xml: 489; makefile: 2
file content (93 lines) | stat: -rw-r--r-- 2,077 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package Mojo::RabbitMQ::Client::LocalQueue;
use Mojo::Base -base;

has message_queue    => sub { [] };
has drain_code_queue => sub { [] };

sub push {
  my $self = shift;

  CORE::push @{$self->message_queue}, @_;
  return $self->_drain_queue();
}

sub get {
  my $self = shift;

  CORE::push @{$self->drain_code_queue}, @_;
  return $self->_drain_queue();
}

sub _drain_queue {
  my $self = shift;

  my $message_count    = scalar @{$self->message_queue};
  my $drain_code_count = scalar @{$self->drain_code_queue};

  my $count
    = $message_count < $drain_code_count ? $message_count : $drain_code_count;

  for (1 .. $count) {
    &{shift @{$self->drain_code_queue}}(shift @{$self->message_queue});
  }

  return $self;
}

1;

=encoding utf8

=head1 NAME

Mojo::RabbitMQ::Client::LocalQueue - Callback queue

=head1 SYNOPSIS

  use Mojo::RabbitMQ::Client::LocalQueue

  my $queue = Mojo::RabbitMQ::Client::LocalQueue->new();

  # Register callback when content appears
  $queue->get(sub { say "got expected content: " . $_[0] });

  # Push some content to consume
  $queue->push("It Works!");

  # This prints:
  # got expected content: It Works!

=head1 DESCRIPTION

L<Mojo::RabbitMQ::Client::LocalQueue> is a queue for callbacks expecting some content to be received.

=head1 METHODS

L<Mojo::RabbitMQ::Client::LocalQueue> implements following methods:

=head2 get

  $queue->get(sub { process_message($_[0]) })

Registers a callback which is executed when new message is pushed to queue.

=head2 push

  $queue->push("Some content");
  $queue->push({objects => 'are also welcome});

Pushes content to queue and also drains all declared callbacks.

=head1 SEE ALSO

L<Mojo::RabbitMQ::Client>, L<Mojo::RabbitMQ::Client::Channel>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2015-2017, Sebastian Podjasek and others

Based on L<AnyEvent::RabbitMQ::LocalQueue> - Copyright (C) 2010 Masahito Ikuta, maintained by C<< bobtfish@bobtfish.net >>

This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0.

=cut