File: AsyncIterator.pm

package info (click to toggle)
libgraphql-perl 0.54-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 712 kB
  • sloc: perl: 5,094; makefile: 2
file content (153 lines) | stat: -rw-r--r-- 3,838 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package GraphQL::AsyncIterator;

use 5.014;
use strict;
use warnings;
use Moo;
use GraphQL::Debug qw(_debug);
use Types::Standard -all;
use Types::TypeTiny -all;
use GraphQL::Type::Library -all;
use GraphQL::PubSub;
use GraphQL::MaybeTypeCheck;
use curry;

use constant DEBUG => $ENV{GRAPHQL_DEBUG};

=head1 NAME

GraphQL::AsyncIterator - iterator objects that return promise to next result

=head1 SYNOPSIS

  use GraphQL::AsyncIterator;
  my $i = GraphQL::AsyncIterator->new(
    promise_code => $pc,
  );
  # also works when publish happens before next_p called
  my $promised_value = $i->next_p;
  $i->publish('hi'); # now $promised_value will be fulfilled

  $i->close_tap; # now next_p will return undef

=head1 DESCRIPTION

Encapsulates the asynchronous event-handling needed for the
publish/subscribe behaviour needed by L<GraphQL::Subscription>.

=head1 ATTRIBUTES

=head2 promise_code

A hash-ref matching L<GraphQL::Type::Library/PromiseCode>, which must
provide the C<new> key.

=cut

has promise_code => (is => 'ro', isa => PromiseCode);

=head1 METHODS

=head2 publish(@values)

Resolves the relevant promise with C<@values>.

=cut

has _values_queue => (is => 'ro', isa => ArrayRef, default => sub { [] });
has _next_promise => (is => 'rw', isa => Maybe[Promise]);

method publish(@values) {
  $self->_emit('resolve', \@values);
}

method _promisify((Enum[qw(resolve reject)]) $method, $data) {
  return $data if is_Promise($data);
  $self->promise_code->{$method}->(@$data);
}

method _thenify(Maybe[CodeLike] $then, Maybe[CodeLike] $catch, (Enum[qw(resolve reject)]) $method, $data) {
  return $data unless $then or $catch;
  $self->_promisify($method, $data)->then($then, $catch);
}

method _emit((Enum[qw(resolve reject)]) $method, $data) {
  if ($self->_exhausted) {
    die "Tried to emit to closed-off AsyncIterator\n";
  }
  if (my $next_promise = $self->_next_promise) {
    $next_promise->$method(ref $data eq 'ARRAY' ? @$data : $data);
    $self->_next_promise(undef);
  } else {
    push @{$self->_values_queue}, { data => $data, method => $method };
  }
}

=head2 error(@values)

Rejects the relevant promise with C<@values>.

=cut

method error(@values) {
  $self->_emit('reject', \@values);
}

=head2 next_p

Returns either a L<GraphQL::Type::Library/Promise> of the next value,
or C<undef> when closed off. Do not call this if a previous promised next
value has not been settled, as a queue is not maintained.

The promise will have each of the sets of handlers added by L</map_then>
appended.

=cut

method next_p() :ReturnType(Maybe[Promise]) {
  return undef if $self->_exhausted and !@{$self->_values_queue};
  my $np;
  if (my $value = shift @{$self->_values_queue}) {
    $np = $self->_promisify(@$value{qw(method data)});
  } else {
    $np = $self->_next_promise($self->promise_code->{new}->());
  }
  $np = $self->_thenify(@$_, 'resolve', $np) for @{$self->_handler_frames};
  $np;
}

=head2 close_tap

Switch to being closed off. L</next_p> will return C<undef> as soon as
it runs out of L</publish>ed values. L</publish> will throw an exception.
B<NB> This will not cause the settling of any outstanding promise returned
by L</next_p>.

=cut

has _exhausted => (is => 'rw', isa => Bool, default => sub { 0 });

method close_tap() :ReturnType(Maybe[Promise]) {
  return if $self->_exhausted; # already done - no need to redo
  $self->_exhausted(1);
}

=head2 map_then($then, $catch)

Adds the handlers to this object's list of handlers, which will be
attached to promises returned by L</next_p>. Returns self.

=cut

has _handler_frames => (
  is => 'ro', isa => ArrayRef[ArrayRef[CodeLike]], default => sub {[]},
);

method map_then(Maybe[CodeLike] $then, Maybe[CodeLike] $catch = undef) {
  push @{$self->_handler_frames}, [ $then, $catch ];
  $self;
}

__PACKAGE__->meta->make_immutable();

1;