1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
#!/usr/bin/perl
use strict;
use warnings;
=pod
This script demonstrates using L<KiokuDB> to properly serialize a closure,
including maintaining the proper identity of all the referenced objects in the
captured variables.
This feature is used to implement a simple job queue, where the queue
management is handled by DBIC, but the job body is a closure.
Actual job queue features are missing (e.g. marking a job as in progress, etc),
but the point is to show off KiokuDB, not to write a job queue ;-)
=cut
use KiokuDB;
{
# this is just a mock data
package MyApp::DB::Result::DataPoint;
use base qw(DBIx::Class);
__PACKAGE__->load_components(qw(Core));
__PACKAGE__->table('data_point');
__PACKAGE__->add_columns(
id => { data_type => "integer" },
value => { data_type => "integer" },
);
__PACKAGE__->set_primary_key('id');
# and a mock result data (the output of a job)
package MyApp::DB::Result::Output;
use base qw(DBIx::Class);
__PACKAGE__->load_components(qw(Core));
__PACKAGE__->table('output');
__PACKAGE__->add_columns(
id => { data_type => "integer" },
value => { data_type => "integer" },
);
__PACKAGE__->set_primary_key('id');
# this represents a queued or finished job
package MyApp::DB::Result::Job;
use base qw(DBIx::Class);
__PACKAGE__->load_components(qw(KiokuDB Core));
__PACKAGE__->table('foo');
__PACKAGE__->add_columns(
id => { data_type => "integer" },
description => { data_type => "varchar" },
action => { data_type => "varchar" },
finished => { data_type => "bool", default_value => 0 },
result => { data_type => "integer", is_nullable => 1, },
);
__PACKAGE__->set_primary_key('id');
__PACKAGE__->kiokudb_column('action');
__PACKAGE__->belongs_to( result => "MyApp::DB::Result::Output" );
sub run {
my $self = shift;
# run the actual action
$self->action->($self);
# mark the job as finished
$self->finished(1);
$self->update;
}
package MyApp::DB;
use base qw(DBIx::Class::Schema);
__PACKAGE__->load_components(qw(Schema::KiokuDB));
__PACKAGE__->register_class( Job => qw(MyApp::DB::Result::Job));
__PACKAGE__->register_class( Output => qw(MyApp::DB::Result::Output));
__PACKAGE__->register_class( DataPoint => qw(MyApp::DB::Result::DataPoint));
}
my $dir = KiokuDB->connect(
'dbi:SQLite:dbname=:memory:',
schema => "MyApp::DB",
create => 1,
);
my $schema = $dir->backend->schema;
# create some data
$schema->txn_do(sub {
my $rs = $schema->resultset("DataPoint");
$rs->create({ value => 4 });
$rs->create({ value => 3 });
$rs->create({ value => 2 });
$rs->create({ value => 50 });
});
# queue a job
$dir->txn_do( scope => 1, body => sub {
my $small_numbers = $schema->resultset("DataPoint")->search({ value => { "<=", 10 } });
# create a closure for the job:
my $action = sub {
my $self = shift;
my $sum = 0;
# small_numbers is a closure variable, which will be saved implicitly
# as a KiokuDB object
while ( my $data_point = $small_numbers->next ) {
$sum += $data_point->value;
}
# $schema is also restored properly
$self->result( $schema->resultset("Output")->create({ value => $sum }) );
};
# we can simply store the closure in the DB
$schema->resultset("Job")->create({
description => "sum some small numbers",
action => $action,
});
});
# run a job
# this can be done in worker process, obviously (just change :memory: to a real
# file)
$dir->txn_do( scope => 1, body => sub {
my $jobs = $schema->resultset("Job")->search({ finished => 0 });
my $job = $jobs->search(undef, { limit => 1 })->single;
$job->run();
my $result = $job->result;
# ...
});
|