# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

package Avro::BinaryEncoder;
use strict;
use warnings;

use Config;
use Encode();
use Error::Simple;
use Regexp::Common qw(number);

our $max64;
our $complement = ~0x7F;
if ($Config{use64bitint}) {
    $max64 = 9223372036854775807;
}
else {
    require Math::BigInt;
    $complement = Math::BigInt->new("0b" . ("1" x 57) . ("0" x 7));
    $max64      = Math::BigInt->new("0b0" . ("1" x 63));
}


=head2 encode(%param)

Encodes the given C<data> according to the given C<schema>, and pass it
to the C<emit_cb>

Params are:

=over 4

=item * data

The data to encode (can be any perl data structure, but it should match
schema)

=item * schema

The schema to use to encode C<data>

=item * emit_cb($byte_ref)

The callback that will be invoked with the a reference to the encoded data
in parameters.

=back

=cut

sub encode {
    my $class = shift;
    my %param = @_;
    my ($schema, $data, $cb) = @param{qw/schema data emit_cb/};

    ## a schema can also be just a string
    my $type = ref $schema ? $schema->type : $schema;

    ## might want to profile and optimize this
    my $meth = "encode_$type";
    $class->$meth($schema, $data, $cb);
    return;
}

sub encode_null {
    $_[3]->(\'');
}

sub encode_boolean {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    $cb->( $data ? \"\x1" : \"\x0" );
}

sub encode_int {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    if ($data !~ /^$RE{num}{int}$/) {
        throw Avro::BinaryEncoder::Error("cannot convert '$data' to integer");
    }
    if (abs($data) > 0x7fffffff) {
        throw Avro::BinaryEncoder::Error("int ($data) should be <= 32bits");
    }

    my $enc = unsigned_varint(zigzag($data));
    $cb->(\$enc);
}

sub encode_long {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    if ($data !~ /^$RE{num}{int}$/) {
        throw Avro::BinaryEncoder::Error("cannot convert '$data' to long integer");
    }
    if (abs($data) > $max64) {
        throw Avro::BinaryEncoder::Error("int ($data) should be <= 64bits");
    }
    my $enc = unsigned_varint(zigzag($data));
    $cb->(\$enc);
}

sub encode_float {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    my $enc = pack "f<", $data;
    $cb->(\$enc);
}

sub encode_double {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    my $enc = pack "d<", $data;
    $cb->(\$enc);
}

sub encode_bytes {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    encode_long($class, undef, bytes::length($data), $cb);
    $cb->(\$data);
}

sub encode_string {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    my $bytes = Encode::encode_utf8($data);
    encode_long($class, undef, bytes::length($bytes), $cb);
    $cb->(\$bytes);
}

## 1.3.2 A record is encoded by encoding the values of its fields in the order
## that they are declared. In other words, a record is encoded as just the
## concatenation of the encodings of its fields. Field values are encoded per
## their schema.
sub encode_record {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    for my $field (@{ $schema->fields }) {
        $class->encode(
            schema  => $field->{type},
            data    => $data->{ $field->{name} },
            emit_cb => $cb,
        );
    }
}

## 1.3.2 An enum is encoded by a int, representing the zero-based position of
## the symbol in the schema.
sub encode_enum {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    my $symbols = $schema->symbols_as_hash;
    my $pos = $symbols->{ $data };
    throw Avro::BinaryEncoder::Error("Cannot find enum $data")
        unless defined $pos;
    $class->encode_int(undef, $pos, $cb);
}

## 1.3.2 Arrays are encoded as a series of blocks. Each block consists of a
## long count value, followed by that many array items. A block with count zero
## indicates the end of the array. Each item is encoded per the array's item
## schema.
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size

## maybe here it would be worth configuring what a typical block size should be
sub encode_array {
    my $class = shift;
    my ($schema, $data, $cb) = @_;

    ## FIXME: multiple blocks
    if (@$data) {
        $class->encode_long(undef, scalar @$data, $cb);
        for (@$data) {
            $class->encode(
                schema => $schema->items,
                data => $_,
                emit_cb => $cb,
            );
        }
    }
    ## end of the only block
    $class->encode_long(undef, 0, $cb);
}


## 1.3.2 Maps are encoded as a series of blocks. Each block consists of a long
## count value, followed by that many key/value pairs. A block with count zero
## indicates the end of the map. Each item is encoded per the map's value
## schema.
##
## (TODO)
## If a block's count is negative, its absolute value is used, and the count is
## followed immediately by a long block size indicating the number of bytes in
## the block. This block size permits fast skipping through data, e.g., when
## projecting a record to a subset of its fields.
sub encode_map {
    my $class = shift;
    my ($schema, $data, $cb) = @_;

    my @keys = keys %$data;
    if (@keys) {
        $class->encode_long(undef, scalar @keys, $cb);
        for (@keys) {
            ## the key
            $class->encode_string(undef, $_, $cb);

            ## the value
            $class->encode(
                schema => $schema->values,
                data => $data->{$_},
                emit_cb => $cb,
            );
        }
    }
    ## end of the only block
    $class->encode_long(undef, 0, $cb);
}

## 1.3.2 A union is encoded by first writing a long value indicating the
## zero-based position within the union of the schema of its value. The value
## is then encoded per the indicated schema within the union.
sub encode_union {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    my $idx = 0;
    my $elected_schema;
    for my $inner_schema (@{$schema->schemas}) {
        if ($inner_schema->is_data_valid($data)) {
            $elected_schema = $inner_schema;
            last;
        }
        $idx++;
    }
    unless ($elected_schema) {
        throw Avro::BinaryEncoder::Error("union cannot validate the data");
    }
    $class->encode_long(undef, $idx, $cb);
    $class->encode(
        schema => $elected_schema,
        data => $data,
        emit_cb => $cb,
    );
}

## 1.3.2 Fixed instances are encoded using the number of bytes declared in the
## schema.
sub encode_fixed {
    my $class = shift;
    my ($schema, $data, $cb) = @_;
    if (bytes::length $data != $schema->size) {
        my $s1 = bytes::length $data;
        my $s2 = $schema->size;
        throw Avro::BinaryEncoder::Error("Fixed size doesn't match $s1!=$s2");
    }
    $cb->(\$data);
}

sub zigzag {
    use warnings FATAL => 'numeric';
    if ( $_[0] >= 0 ) {
        return $_[0] << 1;
    }
    return (($_[0] << 1) ^ -1) | 0x1;
}

sub unsigned_varint {
    my @bytes;
    while ($_[0] & $complement) {           # mask with continuation bit
        push @bytes, ($_[0] & 0x7F) | 0x80; # out and set continuation bit
        $_[0] >>= 7;                        # next please
    }
    push @bytes, $_[0]; # last byte
    return pack "C*", @bytes;
}

package Avro::BinaryEncoder::Error;
use parent 'Error::Simple';

1;
