# This program is copyright 2008-2011 Baron Schwartz, 2011 Percona Ireland Ltd.
# Feedback and improvements are welcome.
#
# THIS PROGRAM IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
# MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, version 2; OR the Perl Artistic License.  On UNIX and similar
# systems, you can issue `man perlgpl' or `man perlartistic' to read these
# licenses.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 59 Temple
# Place, Suite 330, Boston, MA  02111-1307  USA.
# ###########################################################################
# Processlist package
# ###########################################################################
{
# Package: Processlist
# Processlist makes events when used to poll SHOW FULL PROCESSLIST.
package Processlist;

use strict;
use warnings FATAL => 'all';
use English qw(-no_match_vars);
use Time::HiRes qw(time usleep);
use List::Util qw(max);
use Data::Dumper;
$Data::Dumper::Indent    = 1;
$Data::Dumper::Sortkeys  = 1;
$Data::Dumper::Quotekeys = 0;

use constant PTDEBUG => $ENV{PTDEBUG} || 0;
use constant {
   # 0-7 are the standard processlist columns.
   ID      => 0,  
   USER    => 1,  
   HOST    => 2,
   DB      => 3,
   COMMAND => 4,
   TIME    => 5,
   STATE   => 6,
   INFO    => 7,
   # 8, 9 and 10 are extra info we calculate.
   START   => 8,  # Calculated start time of statement ($start - TIME)
   ETIME   => 9,  # Exec time of SHOW PROCESSLIST (margin of error in START)
   FSEEN   => 10, # First time ever seen
   PROFILE => 11, # Profile of individual STATE times
};


# Sub: new
#
# Parameters:
#   %args - Arguments
#
# Required Arguments:
#   MasterSlave - MasterSlave obj for finding replicationt threads
#
# Optional Arguments:
#   interval - Hi-res sleep time before polling processlist in <parse_event()>.
#
# Returns:
#   Processlist object
sub new {
   my ( $class, %args ) = @_;
   foreach my $arg ( qw(MasterSlave) ) {
      die "I need a $arg argument" unless $args{$arg};
   }
   # Convert the list of kill commands (Query, Execute, etc) to a hashref for
   # faster check later
   my $kill_busy_commands = {};
   if ($args{kill_busy_commands}) {
       for my $command (split /,/,$args{kill_busy_commands}) {
           $command =~ s/^\s+|\s+$//g;
           $kill_busy_commands->{$command} = 1;
       }
   } else {
       $kill_busy_commands->{Query} = 1;
   }
   $args{kill_busy_commands} = $kill_busy_commands;

   my $self = {
      %args,
      polls       => 0,
      last_poll   => 0,
      active_cxn  => {},  # keyed off ID
      event_cache => [],
      _reasons_for_matching => {},
   };
   return bless $self, $class;
}

# Sub: parse_event
#   Parse rows from PROCESSLIST to make events when queries finish.
#
# Parameters:
#   %args - Arguments
#
# Required Arguments:
#   code - Callback that returns an arrayref of rows from SHOW PROCESSLIST.
#          Replication threads and $dbh->{mysql_thread_id} should be removed
#          from the return value.
#
# Returns:
#   Hashref of a completed event.
#
# Technical Details:
#   Connections (cxn) are tracked in a hashref ($self->{active_cxn}) by their
#   Id from the proclist.  Each poll of the proclist (i.e. each call to the
#   code callback) causes the current cxn/queries to be compared to the saved
#   (active) cxn. One of three things can happen: a new cxn appears, a query
#   ends/changes/restarts, or a cxn ends (and thus ends its query).
#
#   When a new connect appears, we only begin tracking it when the Info column
#   from the proclist is not null, indicating that the cxn is executing a
#   query.  The full proclist for this cxn is saved for comparison with later
#   polls.  This is $prev in the code which really references
#   $self->{active_cxn}.
#
#   For existing cxn, if the Info is the same (i.e. same query), and the Time
#   hasn't decreased, and the query hasn't restarted (look below in the code
#   for how we detect this), then the cxn is still executing the same query.
#   So we do nothing.  But if any one of those 3 conditions is false, that
#   signals a new query.  So we make an event based on saved info from the
#   last poll, then updated the cxn for the new query already in progress.
#
#   When a previously active cxn no longer appears in a poll, then that cxn
#   has ended and so did it's query, so we make an event for the query and
#   then delete the cxn from $self->{active_cxn}.  This is checked in the
#   PREVIOUSLY_ACTIVE_CXN loop.
#
#   The default MySQL server has one-second granularity in the Time column.
#   This means that a statement that starts at X.9 seconds shows 0 seconds
#   for only 0.1 second.  A statement that starts at X.0 seconds shows 0 secs
#   for a second, and 1 second up until it has actually been running 2 seconds.
#   This makes it tricky to determine when a statement has been restarted.
#   Further, this program and MySQL may have some clock skew.  Even if they
#   are running on the same machine, it's possible that at X.999999 seconds
#   we get the time, and at X+1.000001 seconds we get the snapshot from MySQL.
#   (Fortunately MySQL doesn't re-evaluate now() for every process, or that
#   would cause even more problems.)  And a query that's issued to MySQL may
#   stall for any amount of time before it's executed, making even more skew
#   between the times.
#
#   One worst case is,
#     * The processlist measures time at 100.01 and it's 100.
#     * We measure the time.  It says 100.02.
#     * A query was started at 90.  Processlist says Time=10.
#     * We calculate that the query was started at 90.02.
#     * Processlist measures it at 100.998 and it's 100.
#     * We measure time again, it says 100.999.
#     * Time has passed, but the Time column still says 10.
#
#   Another is,
#     * We get the processlist, then the time.
#     * A second later we get the processlist, but it takes 2 sec to fetch.
#     * We measure the time and it looks like 3 sec have passed, but proclist
#       says only one has passed.  This is why etime is necessary.
#   What should we do?  Well, the key thing to notice here is that a new
#   statement has started if a) the Time column actually decreases since we
#   last saw the process, or b) the Time column does not increase for 2
#   seconds, plus the etime of the first and second measurements combined!
sub parse_event {
   my ( $self, %args ) = @_;
   my @required_args = qw(code);
   foreach my $arg ( @required_args ) {
     die "I need a $arg argument" unless $args{$arg};
   }
   my ($code) = @args{@required_args};

   # Our first priority is to return cached events.  The caller expects
   # one event per return so we have to cache our events.  And the caller
   # should accept events as fast as we can return them; i.e. the caller
   # should not sleep between polls--that's our job in here (below).
   # XXX: This should only cause a problem if the caller is really slow
   # between calls to us, in which case polling may be delayed by the
   # caller's slowness plus our interval sleep.
   if ( @{$self->{event_cache}} ) {
      PTDEBUG && _d("Returning cached event");
      return shift @{$self->{event_cache}};
   }

   # It's time to sleep if we want to sleep and this is not the first poll.
   # Again, this assumes that the caller is not sleeping before calling us
   # and is not really slow between calls.  By "really slow" I mean slower
   # than the interval time.
   if ( $self->{interval} && $self->{polls} ) {
      PTDEBUG && _d("Sleeping between polls");
      usleep($self->{interval});
   }

   # Poll the processlist and time how long this takes.  Also get
   # the current time and calculate the poll time (etime) unless
   # these values are given via %args (for testing).
   # $time is the time after the poll so that $time-TIME should equal
   # the query's real start time, but see $query_start below...
   PTDEBUG && _d("Polling PROCESSLIST");
   my ($time, $etime) = @args{qw(time etime)};
   my $start          = $etime ? 0 : time;  # don't need start if etime given
   my $rows           = $code->();
   if ( !$rows ) {
      warn "Processlist callback did not return an arrayref";
      return;
   }
   $time  = time           unless $time;
   $etime = $time - $start unless $etime;
   $self->{polls}++;
   PTDEBUG && _d('Rows:', ($rows ? scalar @$rows : 0), 'in', $etime, 'seconds');

   my $active_cxn = $self->{active_cxn};
   my $curr_cxn   = {};
   my @new_cxn    = ();

   # First look at each currently active process/cxn in the processlist.
   # From these we can determine:
   #   1. If any of our previously active cxn are still active.
   #   2. If there's any new cxn.
   CURRENTLY_ACTIVE_CXN:
   foreach my $curr ( @$rows ) {

      # Each currently active cxn is saved so we can later determine
      # (3) if any previously active cxn ended.
      $curr_cxn->{$curr->[ID]} = $curr;

      # $time - TIME should equal the query's real start time, but since
      # the poll may be delayed, the more-precise start time is
      # $time - $etime - TIME; that is, we compensate $time for the amount
      # of $etime we were delay before MySQL returned us the proclist rows,
      # *But*, we only compensate with $etime for the restart check below
      # because otherwise the start time just becomes the event's ts and
      # that doesn't need to be so precise.
      my $query_start = $time - ($curr->[TIME] || 0);

      if ( $active_cxn->{$curr->[ID]} ) {
         PTDEBUG && _d('Checking existing cxn', $curr->[ID]);
         my $prev      = $active_cxn->{$curr->[ID]}; # previous state of cxn
         my $new_query = 0;
         my $fudge     = ($curr->[TIME] || 0) =~ m/\D/ ? 0.001 : 1; # micro-t?

         # If this is true, then the cxn was executing a query last time
         # we saw it.  Determine if the cxn is executing a new query.
         if ( $prev->[INFO] ) {
            if ( !$curr->[INFO] || $prev->[INFO] ne $curr->[INFO] ) {
               # This is a new/different query because what's currently
               # executing is different from what the cxn was previously
               # executing.
               PTDEBUG && _d('Info is different; new query');
               $new_query = 1;
            }
            elsif ( defined $curr->[TIME] && $curr->[TIME] < $prev->[TIME] ) {
               # This is a new/different query because the current exec
               # time is less than the previous exec time, so the previous
               # query ended and a new one began between polls.
               PTDEBUG && _d('Time is less than previous; new query');
               $new_query = 1;
            }
            elsif ( $curr->[INFO] && defined $curr->[TIME]
                    && $query_start - $etime - $prev->[START] > $fudge)
            {
               # If the query's recalculated start time minus its previously
               # calculated start time is greater than the fudge factor, then
               # the query has restarted.  I.e. the new start time is after
               # the previous start time.
               my $ms = $self->{MasterSlave};
               
               my $is_repl_thread = $ms->is_replication_thread({
                                        Command => $curr->[COMMAND],
                                        User    => $curr->[USER],
                                        State   => $curr->[STATE],
                                        Id      => $curr->[ID]});
               if ( $is_repl_thread ) {
                  PTDEBUG &&
                  _d(q{Query has restarted but it's a replication thread, ignoring});
               }
               else {
                  PTDEBUG && _d('Query restarted; new query',
                     $query_start, $etime, $prev->[START], $fudge);
                  $new_query = 1;
               }
            }

            if ( $new_query ) {
               # The cxn is executing a new query, so the previous query
               # ended.  Make an event for the previous query.
               $self->_update_profile($prev, $curr, $time);
               push @{$self->{event_cache}},
                  $self->make_event($prev, $time);
            }
         }

         # If this is true, the cxn is currently executing a query.
         # Determine if that query is old (i.e. same one running previously),
         # or new.  In either case, we save it to recheck it next poll.
         if ( $curr->[INFO] ) {
            if ( $prev->[INFO] && !$new_query ) {
               PTDEBUG && _d("Query on cxn", $curr->[ID], "hasn't changed");
               $self->_update_profile($prev, $curr, $time);
            }
            else {
               PTDEBUG && _d('Saving new query, state', $curr->[STATE]);
               push @new_cxn, [
                  @{$curr}[0..7],           # proc info
                  int($query_start),        # START
                  $etime,                   # ETIME
                  $time,                    # FSEEN
                  { ($curr->[STATE] || "") => 0 }, # PROFILE
               ];
            }
         }
      } 
      else {
         PTDEBUG && _d('New cxn', $curr->[ID]);
         if ( $curr->[INFO] && defined $curr->[TIME] ) {
            # But only save the new cxn if it's executing.
            PTDEBUG && _d('Saving query of new cxn, state', $curr->[STATE]);
            push @new_cxn, [
               @{$curr}[0..7],           # proc info
               int($query_start),        # START
               $etime,                   # ETIME
               $time,                    # FSEEN
               { ($curr->[STATE] || "") => 0 }, # PROFILE
            ];
         }
      }
   }  # CURRENTLY_ACTIVE_CXN

   # Look at the cxn that we think are active.  From these we can
   # determine:
   #   3. If any of them ended.
   # For the moment, "ended" means "not executing a query".  Later
   # we may track a cxn in its entirety for quasi-profiling.
   PREVIOUSLY_ACTIVE_CXN:
   foreach my $prev ( values %$active_cxn ) {
      if ( !$curr_cxn->{$prev->[ID]} ) {
         PTDEBUG && _d('cxn', $prev->[ID], 'ended');
         push @{$self->{event_cache}},
            $self->make_event($prev, $time);
         delete $active_cxn->{$prev->[ID]};
      }
      elsif (   ($curr_cxn->{$prev->[ID]}->[COMMAND] || "") eq 'Sleep' 
             || !$curr_cxn->{$prev->[ID]}->[STATE]
             || !$curr_cxn->{$prev->[ID]}->[INFO] ) {
         PTDEBUG && _d('cxn', $prev->[ID], 'became idle');
         # We do not make an event in this case because it will have
         # already been made above because of the INFO change.  But
         # until we begin tracking cxn in their entirety, we do not
         # to save idle cxn to save memory.
         delete $active_cxn->{$prev->[ID]};
      }
   }

   # Finally, merge new cxn into our hashref of active cxn.
   # This is done here and not when the new cnx are discovered
   # so that the PREVIOUSLY_ACTIVE_CXN doesn't look at them.
   map { $active_cxn->{$_->[ID]} = $_; } @new_cxn;

   $self->{last_poll} = $time;

   # Return the first event in our cache, if any.  It may be an event
   # we just made, or an event from a previous call.
   my $event = shift @{$self->{event_cache}};
   PTDEBUG && _d(scalar @{$self->{event_cache}}, "events in cache");
   return $event;
}

# The exec time of the query is the max of the time from the processlist, or the
# time during which we've actually observed the query running.  In case two
# back-to-back queries executed as the same one and we weren't able to tell them
# apart, their time will add up, which is kind of what we want.
sub make_event {
   my ( $self, $row, $time ) = @_;

   my $observed_time = $time - $row->[FSEEN];
   my $Query_time    = max($row->[TIME], $observed_time);

   # An alternative to the above.
   # my $observed_time = $self->{last_poll} - $row->[FSEEN];
   # my $Query_time    = max($row->[TIME], $observed_time);

   # Another alternative for this issue:
   # http://code.google.com/p/maatkit/issues/detail?id=1246
   # my $interval      = $time - $self->{last_poll};
   # my $observed_time = ($self->{last_poll} + ($interval/2)) - $row->[FSEEN];
   # my $Query_time    = max($row->[TIME], $observed_time);

   # Slowlog Query_time includes Lock_time and we emulate this, too, but
   # *not* by adding $row->[PROFILE]->{Locked} to $Query_time because
   # our query time is already inclusive since we track query time based on
   # INFO not STATE.  So our query time might be too inclusive since it
   # includes any and all states of the query during its execution.

   my $event = {
      id         => $row->[ID],
      db         => $row->[DB],
      user       => $row->[USER],
      host       => $row->[HOST],
      arg        => $row->[INFO],
      bytes      => length($row->[INFO]),
      ts         => Transformers::ts($row->[START] + $row->[TIME]), # Query END time
      Query_time => $Query_time,
      Lock_time  => $row->[PROFILE]->{Locked} || 0,
   };
   PTDEBUG && _d('Properties of event:', Dumper($event));
   return $event;
}

sub _get_active_cxn {
   my ( $self ) = @_;
   PTDEBUG && _d("Active cxn:", Dumper($self->{active_cxn}));
   return $self->{active_cxn};
}

# Sub: _update_profile
#   Update a query's PROFILE of STATE times.  The given cxn arrayrefs
#   ($prev and $curr) should be the same cxn and same query.  If the
#   query' state hasn't changed, the current state's time is incremented
#   by time elapsed between the last poll and now now ($time).  Else,
#   half the elapsed time is added to the previous state and half to the
#   current state (re issue 1246).
#
#   We cannot calculate a START for any state because the query's TIME
#   covers all states, so there's no way a posteriori to know how much
#   of TIME was spent in any given state.  The best we can do is count
#   how long we see the query in each state where ETIME (poll time)
#   defines our resolution.
#
# Parameters:
#   $prev - Arrayref of cxn's previous info
#   $curr - Arrayref of cxn's current info
#   $time - Current time (taken after poll)
sub _update_profile {
   my ( $self, $prev, $curr, $time ) = @_;
   return unless $prev && $curr;

   my $time_elapsed = $time - $self->{last_poll};

   # Update only $prev because the caller should only be saving that arrayref.

   if ( ($prev->[STATE] || "") eq ($curr->[STATE] || "") ) {
      PTDEBUG && _d("Query is still in", $curr->[STATE], "state");
      $prev->[PROFILE]->{$prev->[STATE] || ""} += $time_elapsed;
   }
   else {
      # XXX The State of this cxn changed between polls.  How long
      # was it in its previous state, and how long has it been in
      # its current state?  We can't tell, so this is a compromise
      # re http://code.google.com/p/maatkit/issues/detail?id=1246
      PTDEBUG && _d("Query changed from state", $prev->[STATE],
         "to", $curr->[STATE]);
      my $half_time = ($time_elapsed || 0) / 2;

      # Previous state ends.
      $prev->[PROFILE]->{$prev->[STATE] || ""} += $half_time;

      # Query assumes new state and we presume that the query has been
      # in that state for half the poll time.
      $prev->[STATE] = $curr->[STATE];
      $prev->[PROFILE]->{$curr->[STATE] || ""}  = $half_time;
   }

   return;
}

# Accepts a PROCESSLIST and a specification of filters to use against it.
# Returns queries that match the filters.  The standard process properties
# are: Id, User, Host, db, Command, Time, State, Info.  These are used for
# ignore and match.
#
# Possible find_spec are:
#   * all            Match all not-ignored queries
#   * busy_time      Match queries that have been Command=Query for longer than
#                    this time
#   * idle_time      Match queries that have been Command=Sleep for longer than
#                    this time
#   * ignore         A hashref of properties => regex patterns to ignore
#   * match          A hashref of properties => regex patterns to match
#
sub find {
   my ( $self, $proclist, %find_spec ) = @_;
   PTDEBUG && _d('find specs:', Dumper(\%find_spec));
   my $ms  = $self->{MasterSlave};

   my @matches;
   $self->{_reasons_for_matching} = undef;
   QUERY:
   foreach my $query ( @$proclist ) {
      PTDEBUG && _d('Checking query', Dumper($query));
      my $matched = 0;

      # Don't allow matching replication threads.
      if (    !$find_spec{replication_threads}
           && $ms->is_replication_thread($query) ) {
         PTDEBUG && _d('Skipping replication thread');
         next QUERY;
      }

      # Match special busy_time.
      #if ( $find_spec{busy_time} && ($query->{Command} || '') eq 'Query' ) {
      if ( $find_spec{busy_time} && exists($self->{kill_busy_commands}->{$query->{Command} || ''}) ) {
         next QUERY unless defined($query->{Time});
         if ( $query->{Time} < $find_spec{busy_time} ) {
            PTDEBUG && _d("Query isn't running long enough");
            next QUERY;
         }
         my $reason = 'Exceeds busy time';
         PTDEBUG && _d($reason);
         # Saving the reasons for each query in the objct is a bit nasty,
         # but the alternatives are worse:
         # - Saving internal data in the query
         # - Instead of using the stringified hashref as a key, using
         #   a checksum of the hashes' contents. Which could occasionally
         #   fail miserably due to timing-related issues.
         push @{$self->{_reasons_for_matching}->{$query} ||= []}, $reason;
         $matched++;
      }

      # Match special idle_time.
      if ( $find_spec{idle_time} && ($query->{Command} || '') eq 'Sleep' ) {
         next QUERY unless defined($query->{Time});
         if ( $query->{Time} < $find_spec{idle_time} ) {
            PTDEBUG && _d("Query isn't idle long enough");
            next QUERY;
         }
         my $reason = 'Exceeds idle time';
         PTDEBUG && _d($reason);
         push @{$self->{_reasons_for_matching}->{$query} ||= []}, $reason;
         $matched++;
      }
 
      PROPERTY:
      foreach my $property ( qw(Id User Host db State Command Info) ) {
         my $filter = "_find_match_$property";
         # Check ignored properties first.  If the proc has at least one
         # property that matches an ignore value, then it is totally ignored.
         # and we can skip to the next proc (query).
         if ( defined $find_spec{ignore}->{$property}
              && $self->$filter($query, $find_spec{ignore}->{$property}) ) {
            PTDEBUG && _d('Query matches ignore', $property, 'spec');
            next QUERY;
         }
         # If the proc's property value isn't ignored, then check if it matches.
         if ( defined $find_spec{match}->{$property} ) {
            if ( !$self->$filter($query, $find_spec{match}->{$property}) ) {
               PTDEBUG && _d('Query does not match', $property, 'spec');
               next QUERY;
            }
            my $reason = 'Query matches ' . $property . ' spec';
            PTDEBUG && _d($reason);
            push @{$self->{_reasons_for_matching}->{$query} ||= []}, $reason;
            $matched++;
         }
      }
      if ( $matched || $find_spec{all} ) {
         PTDEBUG && _d("Query matched one or more specs, adding");
         push @matches, $query;
         next QUERY;
      }
      PTDEBUG && _d('Query does not match any specs, ignoring');
   } # QUERY

   return @matches;
}

sub _find_match_Id {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{Id} && $query->{Id} == $property;
}

sub _find_match_User {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{User}
      && $query->{User} =~ m/$property/;
}

sub _find_match_Host {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{Host}
      && $query->{Host} =~ m/$property/;
}

sub _find_match_db {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{db}
      && $query->{db} =~ m/$property/;
}

sub _find_match_State {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{State}
      && $query->{State} =~ m/$property/;
}

sub _find_match_Command {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{Command}
      && $query->{Command} =~ m/$property/;
}

sub _find_match_Info {
   my ( $self, $query, $property ) = @_;
   return defined $property && defined $query->{Info}
      && $query->{Info} =~ m/$property/;
}

sub _d {
   my ($package, undef, $line) = caller 0;
   @_ = map { (my $temp = $_) =~ s/\n/\n# /g; $temp; }
        map { defined $_ ? $_ : 'undef' }
        @_;
   print STDERR "# $package:$line $PID ", join(' ', @_), "\n";
}

1;
}
# ###########################################################################
# End Processlist package
# ###########################################################################
