File: GearmanDBI.pm

package info (click to toggle)
libdata-objectdriver-perl 0.21-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 760 kB
  • sloc: perl: 3,743; sql: 59; makefile: 7
file content (130 lines) | stat: -rw-r--r-- 3,694 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package Data::ObjectDriver::Driver::GearmanDBI;
use strict;
use warnings;

use base qw( Data::ObjectDriver );
use Data::ObjectDriver::Iterator;
use Storable();
use Digest::MD5;
use Data::Dumper;

__PACKAGE__->mk_accessors(qw(
    dbi client func driver_arg enabled_cb uniqify_cb
    on_exception_cb retry_count timeout
));

sub init {
    my $driver = shift;
    my %param  = @_;

    for my $key (keys %param) {
        $driver->$key($param{$key});
    }
}

sub search {
    my $driver = shift;
    my($class, $terms, $args) = @_;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted GearmanDBI I/O while in restricted mode: search() " . Dumper($terms, $args);
    }

    my $dbi = $driver->dbi;

    ## if Gearman shouldn't be used, fallback to the configured dbi driver
    return $dbi->search(@_)
        unless $driver->enabled_cb->(@_);

    my ($sql, $bind, $stmt) = $dbi->prepare_fetch($class, $terms, $args);
    my $results = $driver->_gearman_search($sql, $bind);

    ## Transform the array returned by gearman to the hash we expect to load
    ## in the object
    my $map    = $stmt->select_map;
    my @select = @{ $stmt->select };

    my $to_hash = sub {
        my $array = shift;
        my $hash;
        my $i = 0;
        for my $col (@select) {
            $hash->{ $map->{$col} } = $array->[$i++];
        }
        return $hash;
    };

    my $nt = $args->{no_triggers};
    my @objs = map { $dbi->load_object_from_rec($class, $_, $nt); }
               map { $to_hash->($_) }
               @$results;

    return wantarray
            ? @objs
            : Data::ObjectDriver::Iterator->new( sub { shift @objs } );
}

sub _gearman_search {
    my $driver = shift;
    my ($sql, $bind) = @_;

    my $uniqify = $driver->uniqify_cb || \&_md5sum;
    my $func    = $driver->func;
    my $uniq    = $uniqify->($sql, $bind);
    my $client  = $driver->client;

    my %options = ();
    $options{on_exception} = $driver->on_exception_cb
        if $driver->on_exception_cb;
    $options{retry_count}  = $driver->retry_count
        if $driver->retry_count;
    $options{timeout}      = $driver->timeout
        if $driver->timeout;

    my $res = $client->do_task( $func =>
        \Storable::nfreeze( {
            driver_arg => $driver->driver_arg,
            sql        => $sql,
            bind       => $bind,
            key        => $uniq,
        } ),
        {
            uniq => $uniq, # coalesce all requests for this data
            %options,
        }
    );
    return $res ? Storable::thaw($$res) : [];
}

sub _md5sum {
    my ($sql, $bind) = @_;
    return Digest::MD5::md5_hex(join "", $sql, @$bind);
}

## every single data access methods are delegated to dbi
## except for search
sub lookup       { shift->dbi->lookup       (@_) }
sub lookup_multi { shift->dbi->lookup_multi (@_) }
sub exists       { shift->dbi->exists       (@_) }
sub insert       { shift->dbi->insert       (@_) }
sub replace      { shift->dbi->replace      (@_) }
sub update       { shift->dbi->update       (@_) }
sub remove       { shift->dbi->remove       (@_) }
sub fetch_data   { shift->dbi->fetch_data   (@_) }

## transactions are passed to dbi
sub add_working_driver { shift->dbi->add_working_driver (@_) }
sub commit             { shift->dbi->commit             (@_) }
sub rollback           { shift->dbi->rollback           (@_) }
sub rw_handle          { shift->dbi->rw_handle          (@_) }
sub r_handle           { shift->dbi->r_handle           (@_) }

## safety AUTOLOAD for the rest of non-core methods
sub DESTROY { }
sub AUTOLOAD {
    my $driver = shift;
    (my $meth = our $AUTOLOAD) =~ s/^.*:://;
    return $driver->dbi->$meth(@_);
}

1;