File: async.pm

package info (click to toggle)
qpid-proton 0.14.0-5
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 9,632 kB
  • ctags: 20,083
  • sloc: java: 39,624; ansic: 29,389; python: 16,581; cpp: 11,250; ruby: 6,618; perl: 2,641; php: 1,033; xml: 957; sh: 230; pascal: 52; makefile: 32
file content (120 lines) | stat: -rw-r--r-- 2,855 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

use qpid_proton;

package async::CallbackAdapter;

sub new {
    my ($class) = @_;
    my ($self) = {};

    my $messenger = $_[1];

    $self->{_messenger} = $messenger;
    $messenger->set_blocking(0);
    $messenger->set_incoming_window(1024);
    $messenger->set_outgoing_window(1024);

    my $message = qpid::proton::Message->new();
    $self->{_message} = $message;
    $self->{_incoming} = $message;
    $self->{_tracked} = {};

    bless $self, $class;
    return $self;
}

sub run {
    my ($self) = @_;

    $self->{_running} = 1;

    my $messenger = $self->{_messenger};

    $messenger->start();
    $self->on_start();

    do {
        $messenger->work;
        $self->process_outgoing;
        $self->process_incoming;
    } while($self->{_running});

    $messenger->stop();

    while(!$messenger->stopped()) {
        $messenger->work;
        $self->process_outgoing;
        $self->process_incoming;
    }

    $self->on_stop();
}

sub stop {
    my ($self) = @_;

    $self->{_running} = 0;
}

sub process_outgoing {
    my ($self) = @_;
    my $tracked = $self->{_tracked};

    foreach $key (keys %{$tracked}) {
        my $on_status = $tracked->{$key};
        if (defined($on_status)) {
            if (!($on_status eq qpid::proton::Tracker::PENDING)) {
                $self->$on_status($status);
                $self->{_messenger}->settle($t);
                # delete the settled item
                undef $tracked->{$key};
            }
        }
    }
}

sub process_incoming {
    my ($self) = @_;
    my $messenger = $self->{_messenger};

    while ($messenger->incoming > 0) {
        my $message = $self->{_message};
        my $t = $messenger->get($message);

        $self->on_receive($message);
        $messenger->accept($t);
    }
}

sub send {
    my ($self) = @_;
    my $messenger = $self->{_messenger};
    my $tracked = $self->{_tracked};
    my $message = $_[1];
    my $on_status = $_[2] || undef;

    my $tracker = $messenger->put($message);

    $tracked->{$tracker} = $on_status if (defined($on_status));
}


1;