1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2019-2024 -- leonerd@leonerd.org.uk
package Future::IO::Impl::IOAsync 0.804;
use v5.14;
use warnings;
use base qw( Future::IO::ImplBase );
=head1 NAME
C<Future::IO::Impl::IOAsync> - implement C<Future::IO> using C<IO::Async>
=head1 DESCRIPTION
This module provides an implementation for L<Future::IO> which uses
L<IO::Async>.
There are no additional methods to use in this module; it simply has to be
loaded, and will provide the C<Future::IO> implementation methods.
use Future::IO;
use Future::IO::Impl::IOAsync;
my $f = Future::IO->sleep(5);
...
=cut
use IO::Async::Loop;
__PACKAGE__->APPLY;
my $loop;
sub sleep
{
shift;
my ( $secs ) = @_;
return ( $loop //= IO::Async::Loop->new )->delay_future( after => $secs );
}
my %watching_read_by_fileno; # {fileno} => [@futures]
# Not (yet) part of Future::IO API but it seems a useful way to build this
sub ready_for_read
{
shift;
my ( $fh ) = @_;
my $watching = $watching_read_by_fileno{ $fh->fileno } //= [];
$loop //= IO::Async::Loop->new;
my $f = $loop->new_future;
my $was = scalar @$watching;
push @$watching, $f;
return $f if $was;
$loop->watch_io(
handle => $fh,
on_read_ready => sub {
$watching->[0]->done;
shift @$watching;
return if scalar @$watching;
$loop->unwatch_io(
handle => $fh,
on_read_ready => 1,
);
delete $watching_read_by_fileno{ $fh->fileno };
},
);
return $f;
}
my %watching_write_by_fileno; # {fileno} => [@futures]
sub ready_for_write
{
shift;
my ( $fh ) = @_;
my $watching = $watching_write_by_fileno{ $fh->fileno } //= [];
$loop //= IO::Async::Loop->new;
my $f = $loop->new_future;
my $was = scalar @$watching;
push @$watching, $f;
return $f if $was;
$loop->watch_io(
handle => $fh,
on_write_ready => sub {
$watching->[0]->done;
shift @$watching;
return if scalar @$watching;
$loop->unwatch_io(
handle => $fh,
on_write_ready => 1,
);
delete $watching_write_by_fileno{ $fh->fileno };
},
);
return $f;
}
sub waitpid
{
shift;
my ( $pid ) = @_;
my $f = ( $loop //= IO::Async::Loop->new )->new_future;
$loop->watch_process( $pid, sub {
my ( undef, $wstatus ) = @_;
$f->done( $wstatus );
} );
$f->on_cancel( sub { $loop->unwatch_process( $pid ) } );
return $f;
}
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=cut
0x55AA;
|