File: sreview-dispatch

package info (click to toggle)
sreview 0.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,420 kB
  • sloc: perl: 11,901; javascript: 509; sh: 72; makefile: 8
file content (233 lines) | stat: -rw-r--r-- 7,590 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/perl -w

# SReview, a web-based video review and transcoding system
# Copyright (c) 2016-2017, Wouter Verhelst <w@uter.be>
#
# SReview is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program.  If not, see
# <http://www.gnu.org/licenses/>.

use strict;
use warnings;

use DBI;
use Mojo::Template;
use Mojo::UserAgent;
use SReview::Config::Common;
use SReview::Talk;

=head1 NAME

sreview-dispatch - Watch the database for talks that need work done, and schedule jobs for them

=head1 SYNOPSIS

sreview-dispatch

=head1 DESCRITPION

B<sreview-dispatch> is the central script for SReview. It can be used in two ways:

=over

=item 1.

Either you run it with an external scheduler (e.g., gridengine, slurm,
PBS, torque). This is the recommended way of using SReview for large
installations. In this mode of operation, C<sreview-dispatch> should be
run once in the entire network, and the C<query_limit> configuration
value should be set to 0.

=item 2.

Or you run it with no external scheduler. This is the default. In this
mode of operation, it is recommended that the C<query_limit>
configuration parameter is set to a nonzero value, so that individual
C<sreview-dispatch> instances do not take all the work, keeping all the
other instances idle. In this mode of operation, one C<sreview-dispatch>
instance should be run per CPU core on every machine that is used for
performing work.

This mode works, but using an external scheduler allows for operating
more flexibly on pending jobs, and on the addition or removal of extra
nodes.

=back

=head1 OPTIONS

None. C<sreview-dispatch> uses the system-wide SReview configuration.
For more information, see L<sreview-config>

=head1 CONFIGURATION

C<sreview-dispatch> considers the following configuration values:

=over

=cut

my $config = SReview::Config::Common::setup;

=item dbistring

The database connection string

=cut

my $dbh = DBI->connect($config->get('dbistring'), '', '') or die "Cannot connect to database!";

=item state_actions

A hash table of shell commands to execute when a talk is found in a
particular state with the C<waiting> progress value. The hash key should
be the state name.

If a state is not specified in the C<state_actions> hash, then
C<sreview-dispatch> will I<ignore> that state. This allows for handling
particular states on particular hosts, if necessary.

The shell commands can use the following template variables:

=over

=item <%== $talkid %>

The database ID of the talk to work on

=item <%== $output_dir %>

The value of the C<script_output> configuration item, i.e., where to
redirect output to.

=back

=cut

my $state_actions = $config->get('state_actions');

my $mt = Mojo::Template->new;
my $ua = Mojo::UserAgent->new;

$mt->vars(1);

while(1) {
	$dbh->begin_work or die $!;
	print "==> Checking for new work...\n";
	my $start = $dbh->prepare("UPDATE talks SET progress = 'scheduled' WHERE id = ?");
	my $publishing = $dbh->prepare("SELECT talks.id, state FROM talks JOIN events ON events.id = talks.event WHERE state IN ('publishing','removing') AND events.name = ?");

	my $st;
	my $statelist = "'" . join("','", keys(%{$state_actions})) . "'";

=item query_limit

The maximum number of requests that should be handled per loop. Should
be set to 0 (meaning infinity) when using an external scheduler;
should probably be set to 1 when not.

=item event

The event in use for this instance of SReview. In previous versions of
SReview, the dispatcher would run things for every event in the
database; since version 0.4 this is no longer the case. This allows
multiple instances of SReview to exist on the same host, sharing the
same database if wanted.

=cut

	$dbh->prepare("UPDATE talks SET state=state_next(state), progress='waiting' WHERE progress='done' AND event = (SELECT id FROM events WHERE name = ?)")->execute($config->get('event'));
	if($config->get('query_limit') > 0) {
		$st = $dbh->prepare("SELECT talks.id, state, progress, title, rooms.name AS room, extract(epoch from (endtime - starttime)) AS length FROM talks JOIN rooms ON rooms.id = talks.room JOIN events ON events.id = talks.event WHERE (state IN ($statelist) AND progress = 'waiting' AND events.name = ?) LIMIT ?");
		$st->execute($config->get('event'), $config->get('query_limit'));
	} else {
		$st = $dbh->prepare("SELECT talks.id, state, progress, title, rooms.name AS room, extract(epoch from (endtime - starttime)) AS length FROM talks JOIN rooms ON rooms.id = talks.room JOIN events ON events.id = talks.event WHERE (state IN ($statelist) AND progress = 'waiting' AND events.name = ?)");
		$st->execute($config->get('event'));
	}
	while(my $row = $st->fetchrow_hashref) {
		if(exists(${$state_actions}{$row->{state}})) {
			print "Starting job for event " . $row->{title} . " in state " . $row->{state} . "...\n";
			$start->execute($row->{id});
			if($config->get('query_limit') >= 1) {
				# if running without an external scheduler, release the lock so the child process can proceed
				$dbh->commit;
			}
			my $statetrans = ${$state_actions}{$row->{state}};

=item script_output

The location where stderr and stdout output should be written to. This
is supposed to be a directory; it is reasonable to create a file for
stderr and a file for stdout for each script run.

=back

=cut

			system $mt->render($statetrans, {talkid => $row->{id}, room => $row->{room}, length => $row->{length}, output_dir => $config->get('script_output')});
		}
	}
	if(!defined($config->get('published_headers')) || !defined($config->get('eventurl_format'))) {
		$dbh->prepare("UPDATE talks SET progress='done' WHERE state IN ('publishing', 'removing') AND event = (SELECT id FROM events WHERE name = ?)")->execute($config->get('event'));
	} else {
		$publishing->execute($config->get('event'));
		while(my $row = $publishing->fetchrow_hashref) {
                        my $url = SReview::Talk->new(talkid => $row->{id})->eventurl;
			my $res = $ua->head($url)->result;
			my $is_published = 1;
			my $headers = $config->get('published_headers');
			HEADER:
			foreach my $header(keys %{$headers}) {
				my $vals = $headers->{$header};
				if(ref $vals ne "ARRAY") {
					$vals = [ $vals ];
				}
				if($header eq '_code') {
					my @found = grep { $res->code eq $_ } @$vals;
					if(scalar(@found) == 0) {
						$is_published = 0;
						last;
					}
				} else {
					my $val;
					eval "\$val = \$res->$header;";
					if($@) {
						warn $@;
						next HEADER;
					}
					my @found = grep { $val eq $_ } @$vals;
					if(scalar(@found) == 0) {
						$is_published = 0;
						last HEADER;
					}
				}
			}
			if(($is_published && $row->{state} eq 'publishing') || (!$is_published && $row->{state} eq 'removing')) {
				$dbh->prepare("UPDATE talks SET progress='done' WHERE id = ?")->execute($row->{id});
			}
		}
	}
	print "finished, waiting 10 seconds...\n";
	$dbh->commit;

	sleep 10;
}

=head1 BUGS

C<sreview-dispatch> currently polls the database every 10 seconds. This
should be changed to an event-based system based on PostgreSQL
asynchronous notifications.

=cut