1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
package Data::ObjectDriver::Driver::GearmanDBI;
use strict;
use warnings;
use base qw( Data::ObjectDriver );
use Data::ObjectDriver::Iterator;
use Storable();
use Digest::MD5;
use Data::Dumper;
__PACKAGE__->mk_accessors(qw(
dbi client func driver_arg enabled_cb uniqify_cb
on_exception_cb retry_count timeout
));
sub init {
my $driver = shift;
my %param = @_;
for my $key (keys %param) {
$driver->$key($param{$key});
}
}
sub search {
my $driver = shift;
my($class, $terms, $args) = @_;
if ($Data::ObjectDriver::RESTRICT_IO) {
die "Attempted GearmanDBI I/O while in restricted mode: search() " . Dumper($terms, $args);
}
my $dbi = $driver->dbi;
## if Gearman shouldn't be used, fallback to the configured dbi driver
return $dbi->search(@_)
unless $driver->enabled_cb->(@_);
my ($sql, $bind, $stmt) = $dbi->prepare_fetch($class, $terms, $args);
my $results = $driver->_gearman_search($sql, $bind);
## Transform the array returned by gearman to the hash we expect to load
## in the object
my $map = $stmt->select_map;
my @select = @{ $stmt->select };
my $to_hash = sub {
my $array = shift;
my $hash;
my $i = 0;
for my $col (@select) {
$hash->{ $map->{$col} } = $array->[$i++];
}
return $hash;
};
my $nt = $args->{no_triggers};
my @objs = map { $dbi->load_object_from_rec($class, $_, $nt); }
map { $to_hash->($_) }
@$results;
return wantarray
? @objs
: Data::ObjectDriver::Iterator->new( sub { shift @objs } );
}
sub _gearman_search {
my $driver = shift;
my ($sql, $bind) = @_;
my $uniqify = $driver->uniqify_cb || \&_md5sum;
my $func = $driver->func;
my $uniq = $uniqify->($sql, $bind);
my $client = $driver->client;
my %options = ();
$options{on_exception} = $driver->on_exception_cb
if $driver->on_exception_cb;
$options{retry_count} = $driver->retry_count
if $driver->retry_count;
$options{timeout} = $driver->timeout
if $driver->timeout;
my $res = $client->do_task( $func =>
\Storable::nfreeze( {
driver_arg => $driver->driver_arg,
sql => $sql,
bind => $bind,
key => $uniq,
} ),
{
uniq => $uniq, # coalesce all requests for this data
%options,
}
);
return $res ? Storable::thaw($$res) : [];
}
sub _md5sum {
my ($sql, $bind) = @_;
return Digest::MD5::md5_hex(join "", $sql, @$bind);
}
## every single data access methods are delegated to dbi
## except for search
sub lookup { shift->dbi->lookup (@_) }
sub lookup_multi { shift->dbi->lookup_multi (@_) }
sub exists { shift->dbi->exists (@_) }
sub insert { shift->dbi->insert (@_) }
sub replace { shift->dbi->replace (@_) }
sub update { shift->dbi->update (@_) }
sub remove { shift->dbi->remove (@_) }
sub fetch_data { shift->dbi->fetch_data (@_) }
## transactions are passed to dbi
sub add_working_driver { shift->dbi->add_working_driver (@_) }
sub commit { shift->dbi->commit (@_) }
sub rollback { shift->dbi->rollback (@_) }
sub rw_handle { shift->dbi->rw_handle (@_) }
sub r_handle { shift->dbi->r_handle (@_) }
## safety AUTOLOAD for the rest of non-core methods
sub DESTROY { }
sub AUTOLOAD {
my $driver = shift;
(my $meth = our $AUTOLOAD) =~ s/^.*:://;
return $driver->dbi->$meth(@_);
}
1;
|