File: LocalQueue.pm

package info (click to toggle)
libanyevent-rabbitmq-perl 1.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster
  • size: 300 kB
  • sloc: perl: 2,777; sh: 6; makefile: 5
file content (60 lines) | stat: -rw-r--r-- 1,146 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package AnyEvent::RabbitMQ::LocalQueue;

use strict;
use warnings;

our $VERSION = '1.16';

sub new {
    my $class = shift;
    return bless {
        _message_queue    => [],
        _drain_code_queue => [],
    }, $class;
}

sub push {
    my $self = shift;

    CORE::push @{$self->{_message_queue}}, @_;
    return $self->_drain_queue();
}

sub get {
    my $self = shift;

    CORE::push @{$self->{_drain_code_queue}}, @_;
    return $self->_drain_queue();
}

sub _drain_queue {
    my $self = shift;

    my $message_count = scalar @{$self->{_message_queue}};
    my $drain_code_count = scalar @{$self->{_drain_code_queue}};

    my $count = $message_count < $drain_code_count
              ? $message_count : $drain_code_count;

    for (1 .. $count) {
        &{shift @{$self->{_drain_code_queue}}}(
            shift @{$self->{_message_queue}}
        );
    }

    return $self;
}

sub _flush {
    my ($self, $frame) = @_;

    $self->_drain_queue;

    while (my $cb = shift @{$self->{_drain_code_queue}}) {
        local $@; # Flush frames immediately, throwing away errors for on-close
        eval { $cb->($frame) };
    }
}

1;