# $Id$

package Data::ObjectDriver::Driver::DBI;
use strict;
use warnings;

use base qw( Data::ObjectDriver Class::Accessor::Fast );

use DBI;
use Carp ();
use Data::ObjectDriver::Errors;
use Data::ObjectDriver::SQL;
use Data::ObjectDriver::Driver::DBD;
use Data::ObjectDriver::Iterator;

my $ForkSafe = _is_fork_safe();
my %Handles;

sub _is_fork_safe {
    return if exists $ENV{DOD_FORK_SAFE} and !$ENV{DOD_FORK_SAFE};
    eval { require POSIX::AtFork; 1 } or return;
    eval { require Scalar::Util; Scalar::Util->import('weaken'); 1 } or return;
    return 1;
}

__PACKAGE__->mk_accessors(qw( dsn username password connect_options dbh get_dbh dbd prefix reuse_dbh force_no_prepared_cache));


sub init {
    my $driver = shift;
    my %param = @_;
    for my $key (keys %param) {
        $driver->$key($param{$key});
    }
    if(!exists $param{dbd}) {
        ## Create a DSN-specific driver (e.g. "mysql").
        my $type;
        if (my $dsn = $driver->dsn) {
            ($type) = $dsn =~ /^dbi:(\w*)/i;
        } elsif (my $dbh = $driver->dbh) {
            $type = $dbh->{Driver}{Name};
        } elsif (my $getter = $driver->get_dbh) {
## Ugly. Shouldn't have to connect just to get the driver name.
            my $dbh = $getter->();
            $type = $dbh->{Driver}{Name};
        }
        $driver->dbd(Data::ObjectDriver::Driver::DBD->new($type));
    }

    if ($ForkSafe) {
        # Purge cached handles
        weaken(my $driver_weaken = $driver);
        POSIX::AtFork->add_to_child(sub {
            return unless $driver_weaken;
            $driver_weaken->dbh(undef);
            %Handles = ();
        });
    }

    $driver;
}

sub generate_pk {
    my $driver = shift;
    if (my $generator = $driver->pk_generator) {
        return $generator->(@_);
    }
}

# Some versions of SQLite require the undefing to finalise properly
sub _close_sth {
    my $sth = shift;
    $sth->finish;
    undef $sth;
}

# Some versions of SQLite have problems with prepared caching due to finalisation order
sub _prepare_cached {
    my $driver = shift;
    my $dbh    = shift;
    my $sql    = shift;
    return ($driver->dbd->can_prepare_cached_statements)? $dbh->prepare_cached($sql) : $dbh->prepare($sql);
}

sub init_db {
    my $driver = shift;
    my $dbh;
    if ($driver->reuse_dbh) {
        $dbh = $Handles{$driver->dsn};
    }
    unless ($dbh) {
        eval {
            $dbh = DBI->connect($driver->dsn, $driver->username, $driver->password,
                { RaiseError => 1, PrintError => 0, AutoCommit => 1,
                ( $ForkSafe ? ( AutoInactiveDestroy => 1 ) : () ),
                %{$driver->connect_options || {}} })
                or Carp::croak("Connection error: " . $DBI::errstr);
        };
        if ($@) {
            Carp::croak($@);
        }
    }
    if ($driver->reuse_dbh) {
        $Handles{$driver->dsn} = $dbh;
    }
    $driver->dbd->init_dbh($dbh);
    $driver->{__dbh_init_by_driver} = 1;
    return $dbh;
}

sub rw_handle {
    my $driver = shift;
    my $db = shift || 'main';
    my $dbh;
    if ($dbh = $driver->dbh) {
        return $dbh if $dbh->ping;

        ## ping fails, kill cache.
        delete $Handles{$driver->dsn};
    }
    if (my $getter = $driver->get_dbh) {
        $dbh = $getter->();
    } else {
        $dbh = $driver->init_db($db) or die $driver->last_error;
        $driver->dbh($dbh);
    }
    return $dbh;
}
*r_handle = \&rw_handle;

sub fetch_data {
    my $driver = shift;
    my($obj) = @_;
    return unless $obj->has_primary_key;
    my $terms = $obj->primary_key_to_terms;
    my $args  = { limit => 1 };
    my $rec = {};
    my $sth = $driver->fetch($rec, $obj, $terms, $args);
    $sth->fetch;
    _close_sth($sth);
    $driver->end_query($sth);
    return $rec;
}

sub prepare_fetch {
    my $driver = shift;
    my($class, $orig_terms, $orig_args) = @_;

    ## Use (shallow) duplicates so the pre_search trigger can modify them.
    my $terms = defined $orig_terms ? ( ref $orig_terms eq 'ARRAY' ? [ @$orig_terms ] : { %$orig_terms } ) : {};
    my $args  = defined $orig_args  ? { %$orig_args  } : {};
    $class->call_trigger('pre_search', $terms, $args);

    my $stmt = $driver->prepare_statement($class, $terms, $args);

    my $sql = $stmt->as_sql;
    $sql .= "\nFOR UPDATE" if $orig_args->{for_update};
    return ($sql, $stmt->{bind}, $stmt)
}

sub fetch {
    my $driver = shift;
    my($rec, $class, $orig_terms, $orig_args) = @_;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        use Data::Dumper;
        die "Attempted DBI I/O while in restricted mode: fetch() " . Dumper($orig_terms, $orig_args);
    }

    my ($sql, $bind, $stmt) = $driver->prepare_fetch($class, $orig_terms, $orig_args);

    my @bind;
    my $map = $stmt->select_map;
    for my $col (@{ $stmt->select }) {
        push @bind, \$rec->{ $map->{$col} };
    }

    my $dbh = $driver->r_handle($class->properties->{db});
    $driver->start_query($sql, $stmt->{bind});

    my $sth = $orig_args->{no_cached_prepare} ? $dbh->prepare($sql) : $driver->_prepare_cached($dbh, $sql);
    $sth->execute(@{ $stmt->{bind} });
    $sth->bind_columns(undef, @bind);

    # need to slurp 'offset' rows for DBs that cannot do it themselves
    if (!$driver->dbd->offset_implemented && $orig_args->{offset}) {
        for (1..$orig_args->{offset}) {
            $sth->fetch;
        }
    }

    return $sth;
}

sub load_object_from_rec {
    my $driver = shift;
    my ($class, $rec, $no_triggers) = @_;

    my $obj = $class->new;
    $obj->set_values_internal($rec);
    ## Don't need a duplicate as there's no previous version in memory
    ## to preserve.
    $obj->{__is_stored} = 1;
    $obj->call_trigger('post_load') unless $no_triggers;
    return $obj;
}

sub search {
    my($driver) = shift;
    my($class, $terms, $args) = @_;

    my $rec = {};
    my $sth = $driver->fetch($rec, $class, $terms, $args);

    my $iter = sub {
        ## This is kind of a hack--we need $driver to stay in scope,
        ## so that the DESTROY method isn't called. So we include it
        ## in the scope of the closure.
        my $d = $driver;

        unless ($sth->fetch) {
            _close_sth($sth);
            $driver->end_query($sth);
            return;
        }
        return $driver->load_object_from_rec($class, $rec, $args->{no_triggers});
    };

    if (wantarray) {
        my @objs = ();

        while (my $obj = $iter->()) {
            push @objs, $obj;
        }
        return @objs;
    } else {
        my $iterator = Data::ObjectDriver::Iterator->new(
            $iter, sub { _close_sth($sth); $driver->end_query($sth) },
        );
        return $iterator;
    }
    return;
}

sub lookup {
    my $driver = shift;
    my($class, $id) = @_;
    return unless defined $id;
    my @obj = $driver->search($class,
        $class->primary_key_to_terms($id), { limit => 1 , is_pk => 1 });
    $obj[0];
}

sub lookup_multi {
    my $driver = shift;
    my($class, $ids) = @_;
    return [] unless @$ids;
    my @got;
    ## If it's a single-column PK, assume it's in one partition, and
    ## use an OR search. FIXME: can we instead check for partitioning?
    unless (ref($ids->[0])) {
        my $terms = $class->primary_key_to_terms([ $ids ]);
        my @sqlgot = $driver->search($class, $terms, { is_pk => 1 });
        my %hgot = map { $_->primary_key() => $_ } @sqlgot;
        @got = map { defined $_ ? $hgot{$_} : undef } @$ids;
    } else {
        for my $id (@$ids) {
            push @got, eval{ $class->driver->lookup($class, $id) };
        }
    }
    \@got;
}

sub select_one {
    my $driver = shift;
    my($sql, $bind) = @_;
    my $dbh = $driver->r_handle;

    $driver->start_query($sql, $bind);
    my $sth = $driver->_prepare_cached($dbh, $sql);
    $sth->execute(@$bind);
    $sth->bind_columns(undef, \my($val));
    unless ($sth->fetch) {
        _close_sth($sth);
        $driver->end_query($sth);
        return;
    }

    _close_sth($sth);
    $driver->end_query($sth);

    return $val;
}

sub table_for {
    my $driver = shift;
    my($this) = @_;
    my $src = $this->datasource or return;
    return $driver->prefix ? join('', $driver->prefix, $src) : $src;
}

sub exists {
    my $driver = shift;
    my($obj) = @_;
    return unless $obj->has_primary_key;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted DBI I/O while in restricted mode: exists()";
    }

    ## should call pre_search trigger so we can use enum in the part of PKs
    my $terms = $obj->primary_key_to_terms;

    my $class = ref $obj;
    $terms ||= {};
    $class->call_trigger('pre_search', $terms);

    my $tbl = $driver->table_for($obj);
    my $stmt = $driver->prepare_statement($class, $terms, { limit => 1 });
    my $sql = "SELECT 1 FROM $tbl\n";
    $sql .= $stmt->as_sql_where;
    my $dbh = $driver->r_handle($obj->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $driver->_prepare_cached($dbh, $sql);
    $sth->execute(@{ $stmt->{bind} });
    my $exists = $sth->fetch;
    _close_sth($sth);
    $driver->end_query($sth);

    return $exists;
}

sub replace {
    my $driver = shift;
    if ($driver->dbd->can_replace) {
        return $driver->_insert_or_replace(@_, { replace => 1 });
    }
    if (! $driver->txn_active) {
        $driver->begin_work;
        my $res;
        eval {
            $driver->remove(@_);
            $res = $driver->insert(@_);
        };
        if ($@) {
            $driver->rollback;
            Carp::croak("REPLACE transaction error $driver: $@");
        }
        $driver->commit;
        return $res;
    }
    $driver->remove(@_);
    $driver->insert(@_);
}

sub insert {
    my $driver = shift;
    my($orig_obj) = @_;
    $driver->_insert_or_replace($orig_obj, { replace => 0 });
}

sub _insert_or_replace {
    my $driver = shift;
    my($orig_obj, $options) = @_;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted DBI I/O while in restricted mode: _insert_or_replace()";
    }

    ## Syntax switch between INSERT or REPLACE statement based on options
    $options ||= {};
    my $INSERT_OR_REPLACE = $options->{replace} ? 'REPLACE' : 'INSERT';

    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_save', $orig_obj);
    $obj->call_trigger('pre_insert', $orig_obj);

    my $cols = $obj->column_names;
    if (!$obj->is_pkless && ! $obj->has_primary_key) {
        ## If we don't already have a primary key assigned for this object, we
        ## may need to generate one (depending on the underlying DB
        ## driver). If the driver gives us a new ID, we insert that into
        ## the new record; otherwise, we assume that the DB is using an
        ## auto-increment column of some sort, so we don't specify an ID
        ## at all.
        my $pk = $obj->primary_key_tuple;
        if(my $generated = $driver->generate_pk($obj)) {
            ## The ID is the only thing we *are* allowed to change on
            ## the original object, so copy it back.
            $orig_obj->$_($obj->$_) for @$pk;
        } else {
            ## Filter the undefined key fields out of the columns to include
            ## in the query, so that we don't specify them in the query.
            my %pk = map { $_ => 1 } @$pk;
            $cols = [ grep !$pk{$_} || defined $obj->$_(), @$cols ];
        }
    }
    my $tbl = $driver->table_for($obj);
    my $sql = "$INSERT_OR_REPLACE INTO $tbl\n";
    my $dbd = $driver->dbd;
    $sql .= '(' . join(', ',
                  map { $dbd->db_column_name($tbl, $_) }
                  @$cols) .
            ')' . "\n" .
            'VALUES (' . join(', ', ('?') x @$cols) . ')' . "\n";
    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $obj->{column_values});
    my $sth = $driver->_prepare_cached($dbh, $sql);
    my $i = 1;
    my $col_defs = $obj->properties->{column_defs};
    for my $col (@$cols) {
        my $val = $obj->column($col);
        my $type = $col_defs->{$col} || 'char';
        my $attr = $dbd->bind_param_attributes($type, $obj, $col);
        $sth->bind_param($i++, $val, $attr);
    }
    eval { $sth->execute };
	die "Failed to execute $sql with ".join(", ",@$cols).": $@" if $@;

    ## Now, if we didn't have an object ID, we need to grab the
    ## newly-assigned ID.
    if (!$obj->is_pkless && ! $obj->has_primary_key) {
        my $pk = $obj->primary_key_tuple; ## but do that only for relation that aren't PK-less
        my $id_col = $pk->[0]; # XXX are we sure we will always use '0' ?
        my $id = $dbd->fetch_id(ref($obj), $dbh, $sth, $driver);
        $obj->$id_col($id);
        ## The ID is the only thing we *are* allowed to change on
        ## the original object.
        $orig_obj->$id_col($id);
    }

    _close_sth($sth);
    $driver->end_query($sth);

    $obj->call_trigger('post_save', $orig_obj);
    $obj->call_trigger('post_insert', $orig_obj);

    $orig_obj->{__is_stored} = 1;
    $orig_obj->{changed_cols} = {};
    1;
}

sub update {
    my $driver = shift;

    my($orig_obj, $terms) = @_;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        use Data::Dumper;
        die "Attempted DBI I/O while in restricted mode: _update() " . Dumper($terms);
    }

    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_save', $orig_obj);
    $obj->call_trigger('pre_update', $orig_obj);

    my $cols = $obj->column_names;
    my @changed_cols = $obj->changed_cols;

    ## If there's no updated columns, update() is no-op
    ## but we should call post_* triggers
    unless (@changed_cols) {
        $obj->call_trigger('post_save', $orig_obj);
        $obj->call_trigger('post_update', $orig_obj);
        return 1;
    }

    my $tbl = $driver->table_for($obj);
    my $sql = "UPDATE $tbl SET\n";
    my $dbd = $driver->dbd;
    $sql .= join(', ',
            map { $dbd->db_column_name($tbl, $_) . " = ?" }
            @changed_cols) . "\n";
    my $stmt = $driver->prepare_statement(ref($obj), {
            %{ $obj->primary_key_to_terms },
            %{ $terms || {} }
        });
    $sql .= $stmt->as_sql_where;

    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $obj->{column_values});
    my $sth = $driver->_prepare_cached($dbh, $sql);
    my $i = 1;
    my $col_defs = $obj->properties->{column_defs};
    for my $col (@changed_cols) {
        my $val = $obj->column($col);
        my $type = $col_defs->{$col} || 'char';
        my $attr = $dbd->bind_param_attributes($type, $obj, $col);
        $sth->bind_param($i++, $val, $attr);
    }

    ## Bind the primary key value(s).
    for my $val (@{ $stmt->{bind} }) {
        $sth->bind_param($i++, $val);
    }

    my $rows = $sth->execute;
    _close_sth($sth);
    $driver->end_query($sth);

    $obj->call_trigger('post_save', $orig_obj);
    $obj->call_trigger('post_update', $orig_obj);

    $orig_obj->{changed_cols} = {};
    return $rows;
}

sub remove {
    my $driver = shift;
    my $orig_obj = shift;

    ## If remove() is called on class method and we have 'nofetch'
    ## option, we remove the record using $term and won't create
    ## $object. This is for efficiency and PK-less tables
    ## Note: In this case, triggers won't be fired
    ## Otherwise, Class->remove is a shortcut for search+remove
    unless (ref($orig_obj)) {
        if ($_[1] && $_[1]->{nofetch}) {
            return $driver->direct_remove($orig_obj, @_);
        } else {
            my $result = 0;
            my @obj = $driver->search($orig_obj, @_);
            for my $obj (@obj) {
                my $res = $obj->remove(@_) || 0;
                $result += $res;
            }
            return $result || 0E0;
        }
    }

    return unless $orig_obj->has_primary_key;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted DBI I/O while in restricted mode: remove()";
    }

    ## Use a duplicate so the pre_save trigger can modify it.
    my $obj = $orig_obj->clone_all;
    $obj->call_trigger('pre_remove', $orig_obj);

    my $tbl = $driver->table_for($obj);
    my $sql = "DELETE FROM $tbl\n";
    my $stmt = $driver->prepare_statement(ref($obj), $obj->primary_key_to_terms);
    $sql .= $stmt->as_sql_where;
    my $dbh = $driver->rw_handle($obj->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $driver->_prepare_cached($dbh, $sql);
    my $result = $sth->execute(@{ $stmt->{bind} });
    _close_sth($sth);
    $driver->end_query($sth);

    $obj->call_trigger('post_remove', $orig_obj);

    $orig_obj->{__is_stored} = 1;
    return $result;
}

sub direct_remove {
    my $driver = shift;
    my($class, $orig_terms, $orig_args) = @_;

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted DBI I/O while in restricted mode: direct_remove() " . Dumper($orig_terms, $orig_args);
    }

    ## Use (shallow) duplicates so the pre_search trigger can modify them.
    my $terms = defined $orig_terms ? { %$orig_terms } : {};
    my $args  = defined $orig_args  ? { %$orig_args  } : {};
    $class->call_trigger('pre_search', $terms, $args);

    my $stmt = $driver->prepare_statement($class, $terms, $args);
    my $tbl  = $driver->table_for($class);
    my $sql  = "DELETE from $tbl\n";
       $sql .= $stmt->as_sql_where;

    # not all DBD drivers can do this.  check.  better to die than do
    # unbounded DELETE when they requested a limit.
    if ($stmt->limit) {
        Carp::croak("Driver doesn't support DELETE with LIMIT")
            unless $driver->dbd->can_delete_with_limit;
        $sql .= $stmt->as_limit;
    }

    my $dbh = $driver->rw_handle($class->properties->{db});
    $driver->start_query($sql, $stmt->{bind});
    my $sth = $driver->_prepare_cached($dbh, $sql);
    my $result = $sth->execute(@{ $stmt->{bind} });
    _close_sth($sth);
    $driver->end_query($sth);
    return $result;
}

sub bulk_insert {
    my $driver = shift;
    my $class = shift;
    my $dbd = $driver->dbd;

    my $cols = shift;
    my $data = shift;


    Carp::croak("Driver doesn't support bulk_insert")
	    unless ($dbd->can('bulk_insert'));

    if ($Data::ObjectDriver::RESTRICT_IO) {
        die "Attempted DBI I/O while in restricted mode: bulk_insert()";
    }

    # check that cols are valid..
    my %valid_cols = map {$_ => 1} @{$class->column_names};

    my $invalid_cols;
    foreach my $c (@{$cols}) {
        $invalid_cols .= "$c " if (!$valid_cols{$c});
    }
    if (defined($invalid_cols)) {
        Carp::croak("Invalid columns $invalid_cols passed to bulk_insert");
    }


    # pass this directly to the backend DBD
    my $dbh = $driver->rw_handle($class->properties->{db});
    my $tbl  = $driver->table_for($class);
    my @db_cols =  map {$dbd->db_column_name($tbl, $_) } @{$cols};

    return $dbd->bulk_insert($dbh, $tbl, \@db_cols, $data);
}

sub begin_work {
    my $driver = shift;

    return if $driver->txn_active;

    my $dbh = $driver->dbh;

    unless ($dbh) {
        $driver->{__delete_dbh_after_txn} = 1;
        $dbh = $driver->rw_handle;
        $driver->dbh($dbh);
    }

    if ($dbh->{AutoCommit}) {
        eval {
            $dbh->begin_work;
        };
        if (my $err = $@) {
            $driver->rollback;
            Carp::croak("Begin work failed for driver $driver: $err");
        }
    }
    ## if for some reason AutoCommit was 0 but txn_active was false,
    ## then we set it to true now
    $driver->txn_active(1);
}

sub commit { shift->_end_txn('commit') }
sub rollback { shift->_end_txn('rollback') }

sub _end_txn {
    my $driver = shift;
    my($action) = @_;

    ## if the driver has its own internal txn_active flag
    ## off, we don't bother ending. Maybe we already did
    if ($driver->txn_active) {
        $driver->txn_active(0);

        my $dbh = $driver->dbh
            or Carp::croak("$action called without a stored handle--begin_work?");

        unless ($dbh->{AutoCommit}) {
            eval { $dbh->$action() };
            if ($@) {
                Carp::croak("$action failed for driver $driver: $@");
            }
        }
    }
    if ($driver->{__delete_dbh_after_txn}) {
        $driver->dbh(undef);
        delete $driver->{__delete_dbh_after_txn};
    }
    return 1;
}

sub DESTROY {
    my $driver = shift;
    ## Don't take the responsibility of disconnecting this handler
    ## if we haven't created it ourself.
    return unless $driver->{__dbh_init_by_driver};
    if (my $dbh = $driver->dbh) {
        $dbh->disconnect;
    }
}

sub prepare_statement {
    my $driver = shift;
    my($class, $terms, $args) = @_;

    my $dbd = $driver->dbd;
    my $stmt = $args->{sql_statement} || $dbd->sql_class->new;

    if (my $tbl = $driver->table_for($class)) {
        my $cols = $class->column_names;
        my %fetch = $args->{fetchonly} ?
            (map { $_ => 1 } @{ $args->{fetchonly} }) : ();
        my $skip = $stmt->select_map_reverse;
        for my $col (@$cols) {
            next if $skip->{$col};
            if (keys %fetch) {
                next unless $fetch{$col};
            }
            my $dbcol = join '.', $tbl, $dbd->db_column_name($tbl, $col);
            $stmt->add_select($dbcol => $col);
        }

        $stmt->from([ $tbl ]);

        if (defined($terms)) {
            if (ref $terms eq 'ARRAY') {
                # Used for translating property names deep within the
                # $terms structure to column names
                $stmt->column_mutator(sub {
                    my ($col) = @_;
                    return $dbd->db_column_name($tbl, $col);
                });
                $stmt->add_complex_where($terms);
                $stmt->column_mutator(undef);
            }
            else {
                for my $col (keys %$terms) {
                    my $db_col = $dbd->db_column_name($tbl, $col);
                    $stmt->add_where(join('.', $tbl, $db_col), $terms->{$col});
                }
            }
        }

        ## Set statement's ORDER clause if any.
        if ($args->{sort} || $args->{direction}) {
            my @order;
            my $sort = $args->{sort} || 'id';
            unless (ref $sort) {
                $sort = [{column    => $sort,
                          direction => $args->{direction}||''}];
            }

            foreach my $pair (@$sort) {
                my $col = $dbd->db_column_name($tbl, $pair->{column} || 'id');
                my $dir = $pair->{direction} || '';
                push @order, {column => $col,
                              desc   => ($dir eq 'descend') ? 'DESC' : 'ASC',
                             }
            }

            $stmt->order(\@order);
        }
    }
    $stmt->limit( $args->{limit} )     if $args->{limit};
    $stmt->offset( $args->{offset} )   if $args->{offset};
    $stmt->comment( $args->{comment} ) if $args->{comment};

    if (my $terms = $args->{having}) {
        for my $col (keys %$terms) {
            $stmt->add_having($col => $terms->{$col});
        }
    }

    $stmt;
}

sub last_error {
    my $driver = shift;
    return $driver->dbd->map_error_code($DBI::err, $DBI::errstr);
}

1;
