File: globus-gram-audit.in

package info (click to toggle)
globus-gram-audit 3.1-3
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 708 kB
  • sloc: sh: 8,303; perl: 471; makefile: 178; xml: 138; sql: 103
file content (732 lines) | stat: -rw-r--r-- 19,875 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
#! /usr/bin/perl 

# Copyright 1999-2011 University of Chicago
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
# http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require Pod::Usage;

=head1 NAME

globus-gram-audit - Upload audit records to a database.

=head1 SYNOPSIS

B<globus-gram-audit> [--conf I<CONFIG-FILE>] [--create | --update=OLD-VERSION] [--check] [--delete]
                     [-audit-directory I<DIR>]

=head1 DESCRIPTION

Upload audit records to a database. Reads
F<$GLOBUS_LOCATION/etc/globus-job-manager.conf> by default
to determine the audit directory and then uploads all files in that directory
that contain valid audit records to the database configured by the 
globus_gram_job_manager_auditing_setup_scripts package. If the upload completes
successfully, the audit files will be removed.

=head1 OPTIONS

=over

=item B<--conf> I<CONFIG-FILE>

Use CONFIG-FILE instead of the default for audit database information

=item B<--create> | B<--update=OLD-VERSION>

Create or update audit database tables.

=item B<--check>

Check whether the insertion of a record was successful. This is used in tests.

=item B<--delete>

Delete the audit record right after inserting it. This is used in tests.

=item B<--audit-directory> I<DIR>

Look for audit records in DIR, instead of looking in the directory specified in
the job manager configuration. This is used in tests.

=item B<--query> I<SQL>

Perform the give SQL query on the audit database.

=back

=cut

@GLOBUS_PERL_INITIALIZER@

use Getopt::Long;
use strict;
use DBI qw(:sql_types);
use POSIX;
use Time::Local;
use Globus::Core::Paths;

my $rc = 0;
my $check = 0;
my $delete = 0;
my $audit_directory = "";
my $help = 0;
my $conf_path = '';
my %conf = ();
my $query = undef;
my $create = undef;
my $update = undef;
my $dbh;
my $sth;
my $check_sth;
my $delete_sth;
my $quiet = 0;

GetOptions(
  'help' => \$help,
  'conf=s' => \$conf_path,
  'check' => \$check,
  'delete' => \$delete,
  'query=s' => \$query,
  'create' => \$create,
  'update=s' => \$update,
  'audit-directory=s' => \$audit_directory,
  'quiet' => \$quiet)
|| Pod::Usage::pod2usage(1);

Pod::Usage::pod2usage(-verbose => 1, -exitval => 0) if $help;

# If the audit record directory wasn't passed as command-line argument,
# read it from job manager configuration file
if ($audit_directory eq '') {
    $audit_directory = get_audit_directory();
    if ($audit_directory eq '' && $query eq '' && (!$create) && (!$update)) {
        print STDERR "No -audit-directory specified in job-manager.conf. " .
            "Nothing to do\n" unless ($quiet);
        exit(1);
    }
}

# Set default audit configuration path if not specified on command-line
if ($conf_path eq '') {
    $conf_path = "$Globus::Core::Paths::sysconfdir/globus/gram-audit.conf";
}

# Parse configuration
(%conf = parse_conf($conf_path)) || exit(1);

# Deal with DB driver
$dbh = dbconnect(\%conf);
if (!defined($dbh))
{
    exit(1);
}

# Implement query or else insert records from the audit directory
if (defined($query))
{
    &execute_query($dbh, $query);
}
elsif ($create)
{
    # Create DB table in the audit database. For SQLite, additionally create the
    # directory for the table file if not present.
    my $auditschemadir = "$Globus::Core::Paths::datadir/globus/gram-audit";
    my $path;
    my $fh;
    my $table_sql;

    if ($conf{DRIVER} eq 'SQLite')
    {
        if ($conf{DATABASE} =~ m/dbname=([^;]*)/) {
            my $dbdir = $1;
            my $dbfile = $1;
            $dbdir =~ s|/[^/]*$||;
            mkdir $dbdir, 0700 if (! -d $dbdir);
        }
    }

    $path = "$auditschemadir/audit-$conf{DRIVER}-$conf{AUDITVERSION}.sql";

    open($fh, "<$path")
        || die "Unable to locate schema for driver $conf{DRIVER} and version $conf{AUDITVERSION} ($path)\n";

    $table_sql = join('', <$fh>);

    foreach (split(/;/, $table_sql)) {
        chomp;
        if ($_ ne '') {
            &execute_query($dbh, $_) || die "Error creating table\n";
        }
    }
    exit(1);
}
elsif ($update)
{
    # Update DB table in the audit database from $update version to currently
    # configured version. For SQLite, additionally create the
    # directory for the table file if not present.
    my $auditschemadir = "$Globus::Core::Paths::datadir/globus/gram-audit";
    my $path;
    my $fh;
    my $table_sql;

    if ($conf{DRIVER} eq 'SQLite')
    {
        if ($conf{DATABASE} =~ m/dbname=([^;]*)/) {
            my $dbdir = $1;
            $dbdir =~ s|/[^/]*$||;
            mkdir $dbdir, 0700 if (! -d $dbdir);
        }
    }

    $path = "$auditschemadir/audit-$conf{DRIVER}-$update-$conf{AUDITVERSION}.sql";

    open($fh, "<$path")
        || die "Unable to locate upgrade schema for driver $conf{DRIVER} from version $update to $conf{AUDITVERSION} ($path)\n";

    $table_sql = join('', <$fh>);

    foreach (split(/;/, $table_sql)) {
        chomp;
        if ($_ ne '') {
            &execute_query($dbh, $_) || die "Error updating tables\n";
        }
    }
    exit(1);
}
else
{
    $sth = insertstatement($dbh, \%conf) || exit(1);
    if ($delete) {
        $delete_sth = $dbh->prepare(
            "DELETE FROM gram_audit_table WHERE job_grid_id = ?");
    } else {
        $delete_sth = undef;
    }

    if ($check) {
        $check_sth = checkstatement($dbh, \%conf);
    } else {
        $check_sth = undef;
    }

    foreach my $f (glob("$audit_directory/*.gramaudit")) {
        my $r = upload_record($f, $dbh, $sth, $check_sth, $delete_sth); 
        $rc += $r;

        if ($r == 0) {
            remove($f);
        }
    }
}
exit($rc);

# Upload an audit record into the configured database. If $check_sth is
# defined, this function also retrieves the record and compares the input
# from the gramaudit file with the result from querying the database. If 
# $delete_sth is defined, this function will remove the record after inserting
# it. This is used in the test scripts, and is not expected to be generally
# used.
#
# \param $record_file
#     Path to the gramaudit file to parse and upload
# \param $dbh
#     Database handle
# \param $sth
#     Prepared INSERT statement handle
# \param $check_sth
#     Prepared SELECT statement handle to retrieve the row from the
#     database (for debugging).
# \param $delete_sth
#     Prepared DELETE statement handle to remove the row from the
#     database after inserting it (for debugging).
#
# \return
#     Returns true if the record was inserted (and optionally checked and/or
#     deleted) successfully, false otherwise.
sub upload_record {
    my $record_file = shift;
    my $dbh = shift;
    my $sth = shift;
    my $check_sth = shift;
    my $delete_sth = shift;
    my $owner;
    my $record;
    my @record_entries;
    my $record_fd;
    my $rv;
    local(*FH);

    # Use POSIX::open instead of perl open so that we can fstat the file
    # to safely get the ownership
    $record_fd = POSIX::open($record_file);
    if (!defined($record_fd))
    {
        print STDERR "Error opening $record_file\n";
        return 1;
    }

    $owner = (POSIX::fstat($record_fd))[4];

    # convert to a perl file handle for easier I/O with the <> operator
    open(FH, "<&=$record_fd");

    # read the record file and confirm that the owner of the file
    # is the local user id in the audit record 
    chomp($record = <FH>);
    $record =~ s/^"//;
    $record =~ s/"$//;
    @record_entries = split(/"[^"]"/, $record);
    close(FH);

    if ($conf{AUDITVERSION} eq '1') {
        if (scalar(@record_entries) == 16) {
            # Job manager is aware of TG audit extension, but the database is
            # not configured to use it, so we'll drop the last field
            # (gateway_user)
            pop(@record_entries);
        } elsif (scalar(@record_entries) != 15) {
            # Invalid audit record
            print STDERR "Skipping invalid audit record $record_file\n";
            return 1;
        }
        verify_audit_fields(\@record_entries, \%conf) || return 1;
    } elsif ($conf{AUDITVERSION} eq '1TG') {
        if (scalar(@record_entries) == 15) {
            push(@record_entries, 'NULL');
        } elsif (scalar(@record_entries) != 16) {
            print STDERR "Skipping invalid audit record $record_file\n";
            return 1;
        }
        verify_audit_fields(\@record_entries, \%conf) || return 1;
    }

    # don't process entries where the owner != the job user
    if ($record_entries[3] ne (getpwuid($owner))[0]) {
        print STDERR "Skipping record-file $record_file: "
                   . "record owner is different from local user id "
                   . "in audit record\n";
        return 1;
    }

    $rv = $sth->execute(map {
            s/&quot;/"/g;
            if ($_ eq 'NULL') {
                $_ = undef;
            }
            $_ } @record_entries); # "
    $sth->finish;
    if (!$rv)
    {
        print STDERR "Insert of $record_file failed: " . $dbh->errstr . "\n";
        return 1;
    }

    if (defined($check_sth))
    {
        my $rv = $check_sth->execute($record_entries[0]);
        my @row;

        if ($conf{DRIVER} ne 'SQLite') {
            $check_sth->bind_col(5, undef, {TYPE => SQL_DATETIME});
            $check_sth->bind_col(6, undef, {TYPE => SQL_DATETIME});
        }

        @row = $check_sth->fetchrow_array();

        if (scalar(@row) != scalar(@record_entries)) {
            print STDERR "Check failed: row contained " . scalar(@row) 
                       . " fields instead of expected "
                       . scalar(@record_entries) . "\n";
            return 1;
        }

        for (my $i = 0; $i < scalar(@record_entries); $i++) {
            if ((!defined($row[$i]) &&  defined($record_entries[$i])) ||
                ( defined($row[$i]) && !defined($record_entries[$i])) ||
                ( defined($row[$i]) && $row[$i] ne $record_entries[$i])) {
                print STDERR "Check failed: "
                           . $row[$i] . " != " . $record_entries[$i] . "\n";
                return 1;
            }
        }
    }

    if (defined($delete_sth))
    {
        $delete_sth->execute($record_entries[0]) || return 1;
    }

    close(FH);
    return 0;
}

# get_audit_directory
#
# Parse the default job manager configuration file. Return the argument
# to -audit-directory if present in the file, or an empty string.
#
# \return
#     Path of the audit directory, or an empty string if not present in the
#     configuration
sub get_audit_directory {
    my $jmconf = "$Globus::Core::Paths::sysconfdir/globus/globus-gram-job-manager.conf";
    my $audit_directory = '';
    local(*F);
    open(F, "<$jmconf");
    while(<F>) {
        if (m/-audit-directory\s+(\S+)\s+/) {
            $audit_directory = Globus::Core::Paths::eval_path($1);
            last;
        }
    }
    close(F);
    return $audit_directory;
}

# parse_conf
#
# Parse configuration file named by $path. The configuration file
# is expected to contain key-value pairs separated by :
#
# \arg $path
#     Path to GRAM audit configuration file
# \return
#     Hash containing ($key, $value) pairs from $path
sub parse_conf
{
    my $path = shift;
    my %result = ();

    local(*CONF);

    if (!open(CONF, "<$path"))
    {
        print STDERR "Error opening configuration file \"$path\"\n"
            unless ($quiet);
        return ();
    }

    while (<CONF>)
    {
        my ($key, $value);
        chomp;
        ($key, $value) = split(/:/, $_, 2);

        $result{$key} = Globus::Core::Paths::eval_path($value);
    }
    return %result;
}

# Connect to database described in configuration hash %conf.
#
# \arg %conf
#     Hash containing DRIVER, DATABASE, USERNAME, and PASSWORD keys.
# \return
#     DBI handle connected to the selected database or undef.
sub dbconnect
{
    my $conf = $_[0];
    my $dbh;
    my $sth;
    my @available_drivers = DBI->available_drivers();
    my $driver_ok = 0;

    for (my $i = 0; $i < scalar(@available_drivers); $i++)
    {
        if ($available_drivers[$i] eq $conf->{'DRIVER'})
        {
            $driver_ok = 1;
        }
    }
    if (!$driver_ok)
    {
        print STDERR "Error connecting to database: driver "
                   . "\"$conf->{DRIVER}\" not installed\n" unless ($quiet);
        return undef;
    }

    $dbh = DBI->connect(
            "dbi:$conf->{DRIVER}:$conf->{DATABASE}",
            $conf->{USERNAME},
            $conf->{PASSWORD});

    if (!defined($dbh))
    {
        print STDERR "Error connecting to database\n" unless ($quiet);
        return undef;
    }

    return $dbh;
}

# Generate an audit-version specific SQL statement to insert a row into
# the database, with placeholders in the same order as the gramaudit file.
# 
# \param $dbh
#     Database handle.
# \param $conf
#     Reference to the configuration hash from the globus-gram-auditing.conf
#     file
#
# \return
#     A prepared SQL statement or undef if an error occurs.
sub insertstatement
{
    my $dbh = shift;
    my $conf = shift;
    my $sth;

    if ($conf->{AUDITVERSION} eq '1')
    {
        $sth = $dbh->prepare("INSERT INTO gram_audit_table("
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag)"
             . " VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
    }
    elsif ($conf->{AUDITVERSION} eq '1TG')
    {
        $sth = $dbh->prepare("INSERT INTO gram_audit_table("
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag,"
             . "gateway_user) "
             . "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
    }
    else
    {
        print STDERR "Unsupported audit version $conf->{AUDITVERSION}\n";
        $sth = undef;
    }

    return $sth;
}

# Generate an audit-version specific SQL statement to retrieve the columns
# from the database in the same order that they are in the original gramaudit
# file.
# 
# \param $dbh
#     Database handle.
# \param $conf
#     Reference to the configuration hash from the globus-gram-auditing.conf
#     file
#
# \return
#     A prepared SQL statement or undef if an error occurs.
sub checkstatement
{
    my $dbh = shift;
    my $conf = shift;
    my $sth;

    if ($conf->{AUDITVERSION} eq '1')
    {
        $sth = $dbh->prepare("SELECT  "
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag "
             . "FROM gram_audit_table "
             . "WHERE job_grid_id = ?");
    }
    elsif ($conf->{AUDITVERSION} eq '1TG')
    {
        $sth = $dbh->prepare("SELECT "
             . "job_grid_id,"
             . "local_job_id,"
             . "subject_name,"
             . "username,"
             . "idempotence_id,"
             . "creation_time,"
             . "queued_time,"
             . "stage_in_grid_id,"
             . "stage_out_grid_id,"
             . "clean_up_grid_id,"
             . "globus_toolkit_version,"
             . "resource_manager_type,"
             . "job_description,"
             . "success_flag,"
             . "finished_flag, "
             . "gateway_user "
             . "FROM gram_audit_table "
             . "WHERE job_grid_id = ?");
    }
    else
    {
        print STDERR "Unsupported audit version $conf->{AUDITVERSION}\n";
        $sth = undef;
    }

    return $sth;
}

# Check that the fields of the audit record array are valid for the typed
# columns and normalize those to SQL92 syntax.
# 
# \param $record_entries
#     Reference to the list of audit record fields from the gramaudit file.
#     Modified as data is normalized
# \param %conf
#     Configuration values from the GRAM audit configuration.
#
# \return
#     A true value if the audit records match the expected types, false
#     otherwise.
sub verify_audit_fields
{
    my $record_entries = $_[0];
    my %conf = %{$_[1]};
    my $ts;

    if (! parse_timestamp(\$record_entries->[5]))
    {
        return 0;
    }

    if ((! parse_timestamp(\$record_entries->[6])) &&
        $record_entries->[6] ne 'NULL')
    {
        return 0;
    }

    if (!defined(parse_boolean(\$record_entries->[13])))
    {
        return 0;
    }

    if (!defined(parse_boolean(\$record_entries->[14])))
    {
        return 0;
    }

    return 1;
}

# Translate boolean from text in the auditfile into an integer
#
# \param $boolean_ref
#     Reference to the boolean string to parse. The referred-to string is
#     modified by this function to be formatted as an integer (1 for true,
#     0 for false).
#
# \return
#     Integer representation of the boolean value.
sub parse_boolean
{
    my $boolean_ref = $_[0];
    my %values = ( 'true' => 1, 'false' => 0 );

    if (exists($values{$$boolean_ref}))
    {
        $$boolean_ref = $values{$$boolean_ref};
    }
    else
    {
        return undef;
    }

    return $$boolean_ref;
}

# Translate timestamp from ctime() in the auditfile into an SQL92 TIMESTAMP
#
# \param $timestamp_string_ref
#     Reference to the timestamp string to parse. The referred-to string is
#     modified by this function to be formatted in SQL92 TIMESTAMP format
#
# \return
#     Timestamp string value in SQL92 TIMESTAMP format or undef if
#     the string could not be parsed.
#
sub parse_timestamp
{
    my $timestamp_string_ref = $_[0];
    my %months = ( 'Jan' => 1, 'Feb' => 2, 'Mar' => 3, 'Apr' => 4, 'May' => 5,
                   'Jun' => 6, 'Jul' => 7, 'Aug' => 8, 'Sep' => 9, 'Oct' => 10,
                   'Nov' => 11, 'Dec' => 12 );

    # Convert to SQL standard date-time format 
    if ($$timestamp_string_ref !~
            m/(\S{3})\s+(\S{3})\s+(\d+)\s+(\d+):(\d+):(\d+) UTC (\d+)/)
    {
        return undef;
    }

    $$timestamp_string_ref =
            sprintf("%04d-%02d-%02d %02d:%02d:%02d",
                   $7, $months{$2}, $3, $4, $5, $6);
    return $$timestamp_string_ref;
}

sub execute_query
{
    my ($dbh, $query) = @_;
    my $st;
    my $rc;

    $st = $dbh->prepare($query) || exit(1);
    $rc = $st->execute;

    if (defined($rc) && $rc ne '0E0') {
        while (my @row = $st->fetchrow_array) {
            print DBI::neat_list(\@row, 2560, ",") . "\n";
        }
    }

    $st->finish;
}

END {
    if ($dbh)
    {
        $dbh->disconnect;
    }
}