# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

package Avro::DataFileReader;
use strict;
use warnings;

use Object::Tiny qw{
    fh
    reader_schema
    sync_marker
    block_max_size
};

use constant MARKER_SIZE => 16;

# TODO: refuse to read a block more than block_max_size, instead
# do partial reads

use Avro::DataFile;
use Avro::BinaryDecoder;
use Avro::Schema;
use Carp;
use IO::String;
use IO::Uncompress::RawInflate ;
use Fcntl();

sub new {
    my $class = shift;
    my $datafile = $class->SUPER::new(@_);

    my $schema = $datafile->{reader_schema};
    croak "schema is invalid"
        if $schema && ! eval { $schema->isa("Avro::Schema") };

    return $datafile;
}

sub codec {
    my $datafile = shift;
    return $datafile->metadata->{'avro.codec'};
}

sub writer_schema {
    my $datafile = shift;
    unless (exists $datafile->{_writer_schema}) {
        my $json_schema = $datafile->metadata->{'avro.schema'};
        $datafile->{_writer_schema} = Avro::Schema->parse($json_schema);
    }
    return $datafile->{_writer_schema};
}

sub metadata {
    my $datafile = shift;
    unless (exists $datafile->{_metadata}) {
        my $header = $datafile->header;
        $datafile->{_metadata} = $header->{meta} || {};
    }
    return $datafile->{_metadata};
}

sub header {
    my $datafile = shift;
    unless (exists $datafile->{_header}) {
        $datafile->{_header} = $datafile->read_file_header;
    }

    return $datafile->{_header};
}

sub read_file_header {
    my $datafile = shift;

    my $data = Avro::BinaryDecoder->decode(
        reader_schema => $Avro::DataFile::HEADER_SCHEMA,
        writer_schema => $Avro::DataFile::HEADER_SCHEMA,
        reader        => $datafile->{fh},
    );
    croak "Magic '$data->{magic}' doesn't match"
        unless $data->{magic} eq Avro::DataFile->AVRO_MAGIC;

    $datafile->{sync_marker} = $data->{sync}
        or croak "sync marker appears invalid";

    my $codec = $data->{meta}{'avro.codec'} || "";

    throw Avro::DataFile::Error::UnsupportedCodec($codec)
        unless Avro::DataFile->is_codec_valid($codec);

    return $data;
}

sub all {
    my $datafile = shift;

    my @objs;
    my @block_objs;
    do {
        if ($datafile->eof) {
            @block_objs = ();
        }
        else {
            $datafile->read_block_header if $datafile->eob;
            @block_objs = $datafile->read_to_block_end;
            push @objs, @block_objs;
        }

    } until !@block_objs;

    return @objs
}

sub next {
    my $datafile = shift;
    my $count    = shift;

    my @objs;

    $datafile->read_block_header if $datafile->eob;
    return ()                    if $datafile->eof;

    my $block_count = $datafile->{object_count};

    if ($block_count <= $count) {
        push @objs, $datafile->read_to_block_end;
        croak "Didn't read as many objects than expected"
            unless scalar @objs == $block_count;

        push @objs, $datafile->next($count - $block_count);
    }
    else {
        push @objs, $datafile->read_within_block($count);
    }
    return @objs;
}

sub read_within_block {
    my $datafile = shift;
    my $count    = shift;

    my $reader        = $datafile->reader;
    my $writer_schema = $datafile->writer_schema;
    my $reader_schema = $datafile->reader_schema || $writer_schema;
    my @objs;
    while ($count-- > 0 && $datafile->{object_count} > 0) {
        push @objs, Avro::BinaryDecoder->decode(
            writer_schema => $writer_schema,
            reader_schema => $reader_schema,
            reader        => $reader,
        );
        $datafile->{object_count}--;
    }
    return @objs;
}

sub skip {
    my $datafile = shift;
    my $count    = shift;

    my $block_count = $datafile->{object_count};
    if ($block_count <= $count) {
        $datafile->skip_to_block_end
            or croak "Cannot skip to end of block!";
        $datafile->skip($count - $block_count);
    }
    else {
        my $writer_schema = $datafile->writer_schema;
        ## could probably be optimized
        while ($count--) {
            Avro::BinaryDecoder->skip($writer_schema, $datafile->reader);
            $datafile->{object_count}--;
        }
    }
}

sub read_block_header {
    my $datafile = shift;
    my $fh = $datafile->{fh};

    $datafile->header unless $datafile->{_header};

    $datafile->{object_count} = Avro::BinaryDecoder->decode_long(
        undef, undef, $fh,
    );
    $datafile->{block_size} = Avro::BinaryDecoder->decode_long(
        undef, undef, $fh,
    );
    $datafile->{block_start} = tell $fh;

    return unless $datafile->codec eq 'deflate';
    ## we need to read the entire block into memory, to inflate it
    my $nread = read $fh, my $block, $datafile->{block_size} + MARKER_SIZE
        or croak "Error reading from file: $!";

    ## remove the marker
    my $marker = substr $block, -(MARKER_SIZE), MARKER_SIZE, '';
    $datafile->{block_marker} = $marker;

    ## this is our new reader
    $datafile->{reader} = IO::Uncompress::RawInflate->new(\$block);

    return;
}

sub verify_marker {
    my $datafile = shift;

    my $marker = $datafile->{block_marker};
    unless (defined $marker) {
        ## we are in the fh case
        read $datafile->{fh}, $marker, MARKER_SIZE;
    }

    unless (($marker || "") eq $datafile->sync_marker) {
        croak "Oops synchronization issue (marker mismatch)";
    }
    return;
}

sub skip_to_block_end {
    my $datafile = shift;

    if (my $reader = $datafile->{reader}) {
        seek $reader, 0, Fcntl->SEEK_END;
        return;
    }

    my $remaining_size = $datafile->{block_size}
                       + $datafile->{block_start}
                       - tell $datafile->{fh};

    seek $datafile->{fh}, $remaining_size, 0;
    $datafile->verify_marker; ## will do a read
    return 1;
}

sub read_to_block_end {
    my $datafile = shift;

    my $reader = $datafile->reader;
    my @objs = $datafile->read_within_block( $datafile->{object_count} );
    $datafile->verify_marker;
    return @objs;
}

sub reader {
    my $datafile = shift;
    return $datafile->{reader} || $datafile->{fh};
}

## end of block
sub eob {
    my $datafile = shift;

    return 1 if $datafile->eof;

    if ($datafile->{reader}) {
        return 1 if $datafile->{reader}->eof;
    }
    else {
        my $pos = tell $datafile->{fh};
        return 1 unless $datafile->{block_start};
        return 1 if $pos >= $datafile->{block_start} + $datafile->{block_size};
    }
    return 0;
}

sub eof {
    my $datafile = shift;
    if ($datafile->{reader}) {
        return 0 unless $datafile->{reader}->eof;
    }
    return 1 if $datafile->{fh}->eof;
    return 0;
}

package Avro::DataFile::Error::UnsupportedCodec;
use parent 'Error::Simple';

1;
