File: job_queue.pl

package info (click to toggle)
libkiokudb-backend-dbi-perl 1.23-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 260 kB
  • sloc: perl: 1,688; makefile: 7
file content (151 lines) | stat: -rw-r--r-- 3,950 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/perl

use strict;
use warnings;

=pod

This script demonstrates using L<KiokuDB> to properly serialize a closure,
including maintaining the proper identity of all the referenced objects in the
captured variables.

This feature is used to implement a simple job queue, where the queue
management is handled by DBIC, but the job body is a closure.

Actual job queue features are missing (e.g. marking a job as in progress, etc),
but the point is to show off KiokuDB, not to write a job queue ;-)

=cut

use KiokuDB;

{
    # this is just a mock data
    package MyApp::DB::Result::DataPoint;
    use base qw(DBIx::Class);

    __PACKAGE__->load_components(qw(Core));

    __PACKAGE__->table('data_point');

    __PACKAGE__->add_columns(
        id => { data_type => "integer" },
        value => { data_type => "integer" },
    );

    __PACKAGE__->set_primary_key('id');

    # and a mock result data (the output of a job)
    package MyApp::DB::Result::Output;
    use base qw(DBIx::Class);

    __PACKAGE__->load_components(qw(Core));

    __PACKAGE__->table('output');

    __PACKAGE__->add_columns(
        id => { data_type => "integer" },
        value => { data_type => "integer" },
    );

    __PACKAGE__->set_primary_key('id');

    # this represents a queued or finished job
    package MyApp::DB::Result::Job;
    use base qw(DBIx::Class);

    __PACKAGE__->load_components(qw(KiokuDB Core));
    __PACKAGE__->table('foo');
    __PACKAGE__->add_columns(
        id => { data_type => "integer" },
        description => { data_type => "varchar"  },
        action => { data_type => "varchar" },
        finished => { data_type => "bool", default_value => 0 },
        result => { data_type => "integer", is_nullable => 1, },
    );
    __PACKAGE__->set_primary_key('id');

    __PACKAGE__->kiokudb_column('action');

    __PACKAGE__->belongs_to( result => "MyApp::DB::Result::Output" );

    sub run {
        my $self = shift;

        # run the actual action
        $self->action->($self);

        # mark the job as finished
        $self->finished(1);
        $self->update;
    }

    package MyApp::DB;
    use base qw(DBIx::Class::Schema);

    __PACKAGE__->load_components(qw(Schema::KiokuDB));

    __PACKAGE__->register_class( Job => qw(MyApp::DB::Result::Job));
    __PACKAGE__->register_class( Output => qw(MyApp::DB::Result::Output));
    __PACKAGE__->register_class( DataPoint => qw(MyApp::DB::Result::DataPoint));
}

my $dir = KiokuDB->connect(
    'dbi:SQLite:dbname=:memory:',
    schema => "MyApp::DB",
    create => 1,
);

my $schema = $dir->backend->schema;

# create some data
$schema->txn_do(sub {
    my $rs = $schema->resultset("DataPoint");

    $rs->create({ value => 4 });
    $rs->create({ value => 3 });
    $rs->create({ value => 2 });
    $rs->create({ value => 50 });
});

# queue a job
$dir->txn_do( scope => 1, body => sub {
    my $small_numbers = $schema->resultset("DataPoint")->search({ value => { "<=", 10 } });

    # create a closure for the job:
    my $action = sub {
        my $self = shift;

        my $sum = 0;

        # small_numbers is a closure variable, which will be saved implicitly
        # as a KiokuDB object
        while ( my $data_point = $small_numbers->next ) {
            $sum += $data_point->value;
        }

        # $schema is also restored properly
        $self->result( $schema->resultset("Output")->create({ value => $sum }) );
    };

    # we can simply store the closure in the DB
    $schema->resultset("Job")->create({
        description => "sum some small numbers",
        action      => $action,
    });
});

# run a job
# this can be done in worker process, obviously (just change :memory: to a real
# file)
$dir->txn_do( scope => 1, body => sub {
    my $jobs = $schema->resultset("Job")->search({ finished => 0 });

    my $job = $jobs->search(undef, { limit => 1 })->single;

    $job->run();

    my $result = $job->result;

    # ...
});