File: pg_worker.t

package info (click to toggle)
libminion-perl 10.25%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,052 kB
  • sloc: javascript: 3,572; perl: 1,029; sql: 79; makefile: 10
file content (101 lines) | stat: -rw-r--r-- 3,049 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use Mojo::Base -strict;

BEGIN { $ENV{MOJO_REACTOR} = 'Mojo::Reactor::Poll' }

use Test::More;

plan skip_all => 'set TEST_ONLINE to enable this test' unless $ENV{TEST_ONLINE};

use Minion;
use Mojo::IOLoop;

# Isolate tests
require Mojo::Pg;
my $pg = Mojo::Pg->new($ENV{TEST_ONLINE});
$pg->db->query('DROP SCHEMA IF EXISTS minion_worker_test CASCADE');
$pg->db->query('CREATE SCHEMA minion_worker_test');
my $minion = Minion->new(Pg => $ENV{TEST_ONLINE});
$minion->backend->pg->search_path(['minion_worker_test']);

subtest 'Basics' => sub {
  $minion->add_task(
    test => sub {
      my $job = shift;
      $job->finish({just => 'works!'});
    }
  );
  my $worker = $minion->worker;
  $worker->status->{dequeue_timeout} = 0;
  $worker->on(
    dequeue => sub {
      my ($worker, $job) = @_;
      $job->on(reap => sub { kill 'INT', $$ });
    }
  );
  my $id = $minion->enqueue('test');
  my $max;
  $worker->once(wait => sub { $max = shift->status->{jobs} });
  $worker->run;
  is $max, 4, 'right value';
  is_deeply $minion->job($id)->info->{result}, {just => 'works!'}, 'right result';
};

subtest 'Clean up event loop' => sub {
  my $timer = 0;
  Mojo::IOLoop->recurring(0 => sub { $timer++ });
  my $worker = $minion->worker;
  $worker->status->{dequeue_timeout} = 0;
  $worker->on(
    dequeue => sub {
      my ($worker, $job) = @_;
      $job->on(reap => sub { kill 'INT', $$ });
    }
  );
  my $id = $minion->enqueue('test');
  $worker->run;
  is_deeply $minion->job($id)->info->{result}, {just => 'works!'}, 'right result';
  is $timer, 0, 'timer has been cleaned up';
};

subtest 'Signals' => sub {
  $minion->add_task(
    int => sub {
      my $job     = shift;
      my $forever = 1;
      my %received;
      local $SIG{INT}  = sub { $forever = 0 };
      local $SIG{USR1} = sub { $received{usr1}++ };
      local $SIG{USR2} = sub { $received{usr2}++ };
      $job->minion->broadcast('kill', [$_, $job->id]) for qw(USR1 USR2 INT);
      while ($forever) { sleep 1 }
      $job->finish({msg => 'signals: ' . join(' ', sort keys %received)});
    }
  );
  my $worker = $minion->worker;
  $worker->status->{command_interval} = 1;
  $worker->on(
    dequeue => sub {
      my ($worker, $job) = @_;
      $job->on(reap => sub { kill 'INT', $$ });
    }
  );
  my $id = $minion->enqueue('int');
  $worker->run;
  is_deeply $minion->job($id)->info->{result}, {msg => 'signals: usr1 usr2'}, 'right result';

  my $status = $worker->status;
  is $status->{command_interval},   1,   'right value';
  is $status->{dequeue_timeout},    5,   'right value';
  is $status->{heartbeat_interval}, 300, 'right value';
  is $status->{jobs},               4,   'right value';
  is_deeply $status->{queues}, ['default'], 'right structure';
  is $status->{performed}, 1, 'right value';
  ok $status->{repair_interval}, 'has a value';
  is $status->{spare},              1, 'right value';
  is $status->{spare_min_priority}, 1, 'right value';
};

# Clean up once we are done
$pg->db->query('DROP SCHEMA minion_worker_test CASCADE');

done_testing();