1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
use utf8;
use strict;
use warnings;
package DR::Tarantool::CoroClient;
use base 'DR::Tarantool::AsyncClient';
use Coro;
use Carp;
use AnyEvent;
=head1 NAME
DR::Tarantool::CoroClient - an asynchronous coro driver for
L<Tarantool|http://tarantool.org>
=head1 SYNOPSIS
use DR::Tarantool::CoroClient;
use Coro;
my $client = DR::Tarantool::CoroClient->connect(
port => $port,
spaces => $spaces;
);
my @res;
for (1 .. 100) {
async {
push @res => $client->select(space_name => $_);
}
}
cede while @res < 100;
=head1 METHODS
=head2 connect
Connects to Tarantool.
=head3 Arguments
The same as L<DR::Tarantool::AsyncClient/connect>, excluding the callback.
Returns a connection handle or croaks an error.
=head3 Additional arguments
=over
=item raise_error
If B<true> (default behaviour) the driver throws an exception for each
server error.
=back
=cut
sub connect {
my ($class, %opts) = @_;
my $raise_error = 1;
$raise_error = delete $opts{raise_error} if exists $opts{raise_error};
my $cb = Coro::rouse_cb;
$class->SUPER::connect(%opts, $cb);
my ($self) = Coro::rouse_wait;
unless (ref $self) {
croak $self if $raise_error;
$! = $self;
return undef;
}
$self->{raise_error} = $raise_error ? 1 : 0;
$self;
}
=head2 ping
The same as L<DR::Tarantool::AsyncClient/ping>, excluding the callback.
Returns B<true> on success, B<false> on error.
=head2 insert
The same as L<DR::Tarantool::AsyncClient/insert>, excluding the callback.
Returns the inserted tuple or undef.
Croaks an error if insert failed (B<raise_error> must be set).
=head2 select
The same as L<DR::Tarantool::AsyncClient/select>, excluding the callback.
Returns tuple or tuples that match selection criteria, or undef
if no matching tuples were found.
Croaks an error if an error occurred (provided B<raise_error> is set).
=head2 update
The same as L<DR::Tarantool::AsyncClient/update>, excluding the callback.
Returns the new value of the tuple.
Croaks an error if update failed (provided B<raise_error> is set).
=head2 delete
The same as L<DR::Tarantool::AsyncClient/delete>, excluding the callback.
Returns the deleted tuple, or undef.
Croaks error if an error occurred (provided B<raise_error> is set).
=head2 call_lua
The same as L<DR::Tarantool::AsyncClient/call_lua>, excluding the callback.
Returns a tuple or tuples returned by the called procedure.
Croaks an error if an error occurred (provided B<raise_error> is set).
=cut
for my $method (qw(ping insert select update delete call_lua)) {
no strict 'refs';
*{ __PACKAGE__ . "::$method" } = sub {
my ($self, @args) = @_;
my $cb = Coro::rouse_cb;
my $m = "SUPER::$method";
$self->$m(@args, $cb);
my @res = Coro::rouse_wait;
if ($res[0] eq 'ok') {
return 1 if $method eq 'ping';
return $res[1];
}
return 0 if $method eq 'ping';
return undef unless $self->{raise_error};
croak sprintf "%s: %s",
defined($res[1])? $res[1] : 'unknown',
$res[2]
;
};
}
1;
|