# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

package Avro::BinaryDecoder;
use strict;
use warnings;

use Config;
use Encode();
use Error::Simple;
use Avro::Schema;

our $complement = ~0x7F;
unless ($Config{use64bitint}) {
    require Math::BigInt;
    $complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
}

=head2 decode(%param)

Resolve the given writer and reader_schema to decode the data provided by the
reader.

=over 4

=item * writer_schema

The schema that was used to encode the data provided by the C<reader>

=item * reader_schema

The schema we want to use to decode the data.

=item * reader

An object implementing a straightforward interface. C<read($buf, $nbytes)> and
C<seek($nbytes, $whence)> are expected. Typically a IO::String object or a
IO::File object. It is expected that this calls will block the decoder, if not
enough data is available for read.

=back

=cut
sub decode {
    my $class = shift;
    my %param = @_;

    my ($writer_schema, $reader_schema, $reader)
        = @param{qw/writer_schema reader_schema reader/};

    my $type = Avro::Schema->match(
        writer => $writer_schema,
        reader => $reader_schema,
    ) or throw Avro::Schema::Error::Mismatch;

    my $meth = "decode_$type";
    return $class->$meth($writer_schema, $reader_schema, $reader);
}

sub skip {
    my $class = shift;
    my ($schema, $reader) = @_;
    my $type = ref $schema ? $schema->type : $schema;
    my $meth = "skip_$type";
    return $class->$meth($schema, $reader);
}

sub decode_null { undef }

sub skip_boolean { &decode_boolean }
sub decode_boolean {
    my $class = shift;
    my $reader = pop;
    $reader->read(my $bool, 1);
    return unpack 'C', $bool;
}

sub skip_int { &decode_int }
sub decode_int {
    my $class = shift;
    my $reader = pop;
    return zigzag(unsigned_varint($reader));
}

sub skip_long { &decode_long };
sub decode_long {
    my $class = shift;
    return decode_int($class, @_);
}

sub skip_float { &decode_float }
sub decode_float {
    my $class = shift;
    my $reader = pop;
    $reader->read(my $buf, 4);
    return unpack "f<", $buf;
}

sub skip_double { &decode_double }
sub decode_double {
    my $class = shift;
    my $reader = pop;
    $reader->read(my $buf, 8);
    return unpack "d<", $buf,
}

sub skip_bytes {
    my $class = shift;
    my $reader = pop;
    my $size = decode_long($class, undef, undef, $reader);
    $reader->seek($size, 0);
    return;
}

sub decode_bytes {
    my $class = shift;
    my $reader = pop;
    my $size = decode_long($class, undef, undef, $reader);
    $reader->read(my $buf, $size);
    return $buf;
}

sub skip_string { &skip_bytes }
sub decode_string {
    my $class = shift;
    my $reader = pop;
    my $bytes = decode_bytes($class, undef, undef, $reader);
    return Encode::decode_utf8($bytes);
}

sub skip_record {
    my $class = shift;
    my ($schema, $reader) = @_;
    for my $field (@{ $schema->fields }){
        skip($class, $field->{type}, $reader);
    }
}

## 1.3.2 A record is encoded by encoding the values of its fields in the order
## that they are declared. In other words, a record is encoded as just the
## concatenation of the encodings of its fields. Field values are encoded per
## their schema.
sub decode_record {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    my $record;

    my %extra_fields = %{ $reader_schema->fields_as_hash };
    for my $field (@{ $writer_schema->fields }) {
        my $name = $field->{name};
        my $w_field_schema = $field->{type};
        my $r_field_schema = delete $extra_fields{$name};

        ## 1.3.2 if the writer's record contains a field with a name not
        ## present in the reader's record, the writer's value for that field
        ## is ignored.
        if (! $r_field_schema) {
            $class->skip($w_field_schema, $reader);
            next;
        }
        my $data = $class->decode(
            writer_schema => $w_field_schema,
            reader_schema => $r_field_schema->{type},
            reader        => $reader,
        );
        $record->{ $name } = $data;
    }

    for my $name (keys %extra_fields) {
        ## 1.3.2. if the reader's record schema has a field with no default
        ## value, and writer's schema does not have a field with the same
        ## name, an error is signalled.
        unless (exists $extra_fields{$name}->{default}) {
            throw Avro::Schema::Error::Mismatch(
                "cannot resolve without default"
            );
        }
        ## 1.3.2 ... else the default value is used
        $record->{ $name } = $extra_fields{$name}->{default};
    }
    return $record;
}

sub skip_enum { &skip_int }

## 1.3.2 An enum is encoded by a int, representing the zero-based position of
## the symbol in the schema.
sub decode_enum {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    my $index = decode_int($class, @_);

    my $w_data = $writer_schema->symbols->[$index];
    ## 1.3.2 if the writer's symbol is not present in the reader's enum,
    ## then an error is signalled.
    throw Avro::Schema::Error::Mismatch("enum unknown")
        unless $reader_schema->is_data_valid($w_data);
    return $w_data;
}

sub skip_block {
    my $class = shift;
    my ($reader, $block_content) = @_;
    my $block_count = decode_long($class, undef, undef, $reader);
    while ($block_count) {
        if ($block_count < 0) {
            $reader->seek($block_count, 0);
            next;
        }
        else {
            for (1..$block_count) {
                $block_content->();
            }
        }
        $block_count = decode_long($class, undef, undef, $reader);
    }
}

sub skip_array {
    my $class = shift;
    my ($schema, $reader) = @_;
    skip_block($reader, sub { $class->skip($schema->items, $reader) });
}

## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
## long count value, followed by that many array items. A block with count zero
## indicates the end of the array. Each item is encoded per the array's item
## schema.
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size
sub decode_array {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    my $block_count = decode_long($class, @_);
    my @array;
    my $writer_items = $writer_schema->items;
    my $reader_items = $reader_schema->items;
    while ($block_count) {
        my $block_size;
        if ($block_count < 0) {
            $block_count = -$block_count;
            $block_size = decode_long($class, @_);
            ## XXX we can skip with $reader_schema?
        }
        for (1..$block_count) {
            push @array, $class->decode(
                writer_schema => $writer_items,
                reader_schema => $reader_items,
                reader        => $reader,
            );
        }
        $block_count = decode_long($class, @_);
    }
    return \@array;
}

sub skip_map {
    my $class = shift;
    my ($schema, $reader) = @_;
    skip_block($reader, sub {
        skip_string($class, $reader);
        $class->skip($schema->values, $reader);
    });
}

## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
## count value, followed by that many key/value pairs. A block with count zero
## indicates the end of the map. Each item is encoded per the map's value
## schema.
##
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size indicating the number of bytes in
## the block. This block size permits fast skipping through data, e.g., when
## projecting a record to a subset of its fields.
sub decode_map {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    my %hash;

    my $block_count = decode_long($class, @_);
    my $writer_values = $writer_schema->values;
    my $reader_values = $reader_schema->values;
    while ($block_count) {
        my $block_size;
        if ($block_count < 0) {
            $block_count = -$block_count;
            $block_size = decode_long($class, @_);
            ## XXX we can skip with $reader_schema?
        }
        for (1..$block_count) {
            my $key = decode_string($class, @_);
            unless (defined $key && length $key) {
                throw Avro::Schema::Error::Parse("key of map is invalid");
            }
            $hash{$key} = $class->decode(
                writer_schema => $writer_values,
                reader_schema => $reader_values,
                reader        => $reader,
            );
        }
        $block_count = decode_long($class, @_);
    }
    return \%hash;
}

sub skip_union {
    my $class = shift;
    my ($schema, $reader) = @_;
    my $idx = decode_long($class, undef, undef, $reader);
    my $union_schema = $schema->schemas->[$idx]
        or throw Avro::Schema::Error::Parse("union union member");
    $class->skip($union_schema, $reader);
}

## 1.3.2 A union is encoded by first writing a long value indicating the
## zero-based position within the union of the schema of its value. The value
## is then encoded per the indicated schema within the union.
sub decode_union {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    my $idx = decode_long($class, @_);
    my $union_schema = $writer_schema->schemas->[$idx];
    ## XXX TODO: schema resolution
    # The first schema in the reader's union that matches the selected writer's
    # union schema is recursively resolved against it. if none match, an error
    # is signalled.
    return $class->decode(
        reader_schema => $union_schema,
        writer_schema => $union_schema,
        reader => $reader,
    );
}

sub skip_fixed {
    my $class = shift;
    my ($schema, $reader) = @_;
    $reader->seek($schema->size, 0);
}

## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
## schema.
sub decode_fixed {
    my $class = shift;
    my ($writer_schema, $reader_schema, $reader) = @_;
    $reader->read(my $buf, $writer_schema->size);
    return $buf;
}

sub zigzag {
    my $int = shift;
    if (1 & $int) {
        ## odd values are encoded negative ints
        return -( 1 + ($int >> 1) );
    }
    ## even values are positive natural left shifted one bit
    else {
        return $int >> 1;
    }
}

sub unsigned_varint {
    my $reader = shift;
    my $int = 0;
    my $more;
    my $shift = 0;
    do {
        $reader->read(my $buf, 1);
        my $byte = ord $buf;
        my $value = $byte & 0x7F;
        $int |= $value << $shift;
        $shift += 7;
        $more = $byte & 0x80;
    } until (! $more);
    return $int;
}

1;
