1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
|
package POE::Component::DBIAgent::Helper;
use DBI;
#use Daemon; # qw//;
use Data::Dumper;
use POE::Filter::Reference;
BEGIN {
my $can_delay = 0;
eval { require Time::HiRes; };
unless ($@) {
Time::HiRes->import(qw/usleep/);
$can_delay = 1;
}
sub CAN_DELAY { $can_delay }
}
use strict;
use vars qw/$VERSION/;
$VERSION = sprintf("%d.%02d", q$Revision: 0.03 $ =~ /(\d+)\.(\d+)/);
use constant DEBUG => 0;
use constant DEBUG_NOUPDATE => 0;
my $filter = POE::Filter::Reference->new();
sub run {
DEBUG && warn " QA: start\n";
DEBUG_NOUPDATE && warn " QA: NO UPDATE\n";
my ($type, $dsn, $queries) = @_;
my $self = bless {}, $type;
$self->_init_dbi($dsn, $queries);
$| = 1;
$self->{dbh}->{RaiseError} = 0;
$self->{dbh}->{PrintError} = 0;
DEBUG && warn " QA: initialized\n";
my ($row, $output); # to hold DBI results
while ( sysread( STDIN, my $buffer = '', 1024 ) ) {
my $lines = $filter->get( [ $buffer ] );
#++ look for the exit sign in the current set of commands
my $exit = grep /^EXIT$/, map $_->{query}, @$lines;
### DEBUG && warn "Exit? - ", $exit, "\n";
foreach my $task (@$lines) {
### DEBUG && warn " QA: Got line: ", Dumper($task), "\n";
#++ this doesn't match what DBIAgent::Queue sends in exit_all();
# last if /^EXIT$/; # allow parent to tell us to exit
# Set up query
my ($query_id);
$query_id = $task->{query};
my $rowtype = $task->{hash} ? 'fetchrow_hashref' : 'fetchrow_arrayref';
if ($query_id eq 'CREATE' or $query_id eq 'EXIT') {
#++ make sure the EXIT event isn't actually sent to the db
next;
}
### DEBUG && warn " QA: Read data: $query_id for $task->{state} (params @{$task->{params}})\n";
unless (exists $self->{$query_id}) {
DEBUG && warn " QA: No such query: $query_id";
next;
}
DEBUG && warn " QA: query $query_id exists\n";
my $rowcount = 0;
my $result = { package => $task->{package}, state => $task->{state},
data => undef,
query => $query_id,
id => $task->{id},
cookie => $task->{cookie} || undef, # XXX remove?
group => $task->{group},
};
if (ref $self->{$query_id}) { # Is it a DBI statement handle?
# Normal query loop. This is where we usually go.
unless ( $self->{$query_id}->execute( @{$task->{params}} ) ) {
DEBUG && warn " QA: error executing query: ", $self->{$query_id}->errstr,"\n";
# this goes to stderr. If an ErrorState was
# supplied, the user will see this message.
warn "QA: error executing query: ", $self->{$query_id}->errstr,"\n";
$result->{data} = 'EOF';
$output = $filter->put( [ $result ] );
print @$output;
#print "ERROR|", $self->{$query_id}->errstr, "\n";
} else {
DEBUG && warn " QA: query running\n";
if ($self->{$query_id}{Active}) {
while (defined ($row = $self->{$query_id}->$rowtype())) {
$rowcount++;
$result->{data} = $row;
$output = $filter->put( [ $result ] );
# This prevents monopolizing the parent with
# db responses.
CAN_DELAY and $task->{delay} and usleep(1);
print @$output;
#warn " QA: got row $rowcount: ",,"\n";
}
}
$result->{data} = 'EOF';
$output = $filter->put( [ $result ] );
print @$output;
DEBUG && warn " QA: ROWS|$rowcount\n";
}
} else { # *NOT* a DBI statement handle
# $queries->{$query_id} is a STRING query. This is a
# debug feature. Print a debug message, and send back
# EOF, but don't actually touch the database.
my $query = $queries->{$query_id};
my @params = @{$task->{params}};
# Replace ? placeholders with bind values.
$query =~ s/\?/@params/eg;
DEBUG && warn " QA: $query\n";
$result->{data} = 'EOF';
$output = $filter->put( [ $result ] );
print @$output;
}
}
#++ put here to make sure all the queries in the current buffer are dealt with before disconnecting
last if $exit;
}
DEBUG && warn " QA: Disconnect and Exit\n";
$self->{dbh}->disconnect;
}
# {{{ _init_dbi
sub _init_dbi {
my ($heap, $dsn, $queries) = @_;
my $dbh = DBI->connect(@$dsn, { AutoCommit => 1, RaiseError => 0, PrintError => 0 }) or die DBI->errstr;
$heap->{dbh} = $dbh;
#$dbh->{RowCacheSize} = 500;
if (defined $queries) {
foreach (keys %$queries) {
if ($queries->{$_} =~ /insert|update|delete/i and DEBUG_NOUPDATE) {
$heap->{$_} = $queries->{$_};
} else {
$heap->{$_} = $dbh->prepare($queries->{$_}) or die $dbh->errstr;
}
}
return;
}
}
# }}} _init_dbi
1;
__END__
=head1 NAME
POE::Component::DBIAgent::Helper - DBI Query Helper for DBIAgent
=head1 SYNOPSYS
use Socket qw/:crlf/;
use POE qw/Filter::Line Wheel::Run Component::DBIAgent::Helper/;
sub _start {
my $helper = POE::Wheel::Run ->new(
Program => sub {
POE::Component::DBIAgent::Helper->run($self->{dsn},
$self->{queries}
);
},
StdoutEvent => 'db_reply',
StderrEvent => 'remote_stderr',
ErrorEvent => 'error',
StdinFilter => POE::Filter::Line->new(),
StdoutFilter => POE::Filter::Line->new( Literal => CRLF),
StderrFilter => POE::Filter::Line->new(),
)
or carp "Can't create new DBIAgent::Helper: $!\n";
}
sub query {
my ($self, $query, $package, $state, @rest) = @_;
$self->{helper}->put(join '|', $query, $package, $state, @rest);
}
sub db_reply {
my ($kernel, $self, $heap, $input) = @_[KERNEL, OBJECT, HEAP, ARG0];
# $input is either the string 'EOF' or a Storable object.
}
=head1 DESCRIPTION
This is our helper routine for DBIAgent. It accepts queries on STDIN,
and returns the results on STDOUT. Queries are returned on a
row-by-row basis, followed by a row consisting of the string 'EOF'.
Each row is the return value of $sth->fetch, which is an arrayref.
This row is then passed to Storable for transport, and printed to
STDOUT. HOWEVER, Storable uses newlines ("\n") in its serialized
strings, so the Helper is designed to use the "network newline" pair
CR LF as the line terminator for STDOUT.
When fetch() returns undef, one final row is returned to the calling
state: the string 'EOF'. Sessions should test for this value FIRST
when being invoked with input from a query.
=head2 Initialization
The Helper has one public subroutine, called C<run()>, and is invoked
with two parameters:
=over
=item The DSN
An arrayref of parameters to pass to DBI->connect (usually a dsn,
username, and password).
=item The Queries.
A hashref of the form Query_Name => "$SQL". See
L<POE::Component::DBIAgent> for details.
=back
=head1 BUGS
I have NO idea what to do about handling signals intelligently.
Specifically, under some circumstances, Oracle will refuse to
acknowledge SIGTERM (presumably since its libraries are non-reentrant)
so sometimes SIGKILL is required to terminate a Helper process.
=head1 AUTHOR
This module has been fine-tuned and packaged by Rob Bloodgood
E<lt>robb@empire2.comE<gt>. However, most of the code came directly
from Fletch E<lt>fletch@phydeaux.orgE<gt>, either directly
(Po:Co:DBIAgent:Queue) or via his ideas. Thank you, Fletch!
However, I own all of the bugs.
This module is free software; you may redistribute it and/or modify it
under the same terms as Perl itself.
=cut
|