# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'avro'
require 'webrick'
require 'uri'
require 'logger'

class GenericResponder < Avro::IPC::Responder
  def initialize(proto, msg, datum)
    proto_json = open(proto).read
    super(Avro::Protocol.parse(proto_json))
    @msg = msg
    @datum = datum
  end

  def call(message, request)
    if message.name == @msg
      STDERR.puts "Message: #{message.name} Datum: #{@datum.inspect}"
      @datum
    end
  end
end

class GenericHandler < WEBrick::HTTPServlet::AbstractServlet
  def do_POST(req, resp)
    call_request = Avro::IPC::FramedReader.new(StringIO.new(req.body)).read_framed_message
    unframed_resp = $responder.respond(call_request)
    writer = Avro::IPC::FramedWriter.new(StringIO.new)
    writer.write_framed_message(unframed_resp)
    resp.body = writer.to_s
    @server.stop
  end
end

def run_server(uri, proto, msg, datum)
  uri = URI.parse(uri)
  $responder = GenericResponder.new(proto, msg, datum)
  server = WEBrick::HTTPServer.new(:BindAddress => uri.host,
                                   :Port => uri.port,
                                   :Logger => Logger.new(StringIO.new))
  server.mount '/', GenericHandler
  puts "Port: #{server.config[:Port]}"
  STDOUT.flush
  trap("INT") { server.stop }
  trap("TERM") { server.stop }
  server.start
end

def send_message(uri, proto, msg, datum)
  uri = URI.parse(uri)
  trans = Avro::IPC::HTTPTransceiver.new(uri.host, uri.port)
  proto_json = open(proto).read
  requestor = Avro::IPC::Requestor.new(Avro::Protocol.parse(proto_json),
                                       trans)
  p requestor.request(msg, datum)
end

def file_or_stdin(f)
  f == "-" ? STDIN : open(f)
end

def main
  if ARGV.size == 0
    puts "Usage: #{$0} [dump|rpcreceive|rpcsend]"
    return 1
  end

  case ARGV[0]
  when "dump"
    if ARGV.size != 3
      puts "Usage: #{$0} dump input_file"
      return 1
    end
    d = Avro::DataFile.new(file_or_stdin(ARGV[1]), Avro::IO::DatumReader.new)
    d.each{|o| puts o.inspect }
    d.close
  when "rpcreceive"
    usage_str = "Usage: #{$0} rpcreceive uri protocol_file "
    usage_str += "message_name (-data d | -file f)"

    unless [4, 6].include?(ARGV.size)
      puts usage_str
      return 1
    end
    uri, proto, msg = ARGV[1,3]
    datum = nil
    if ARGV.size > 4
      case ARGV[4]
      when "-file"
        Avro::DataFile.open(ARGV[5]) {|f|
          f.each{|d| datum = d; break }
        }
      when "-data"
        puts "JSON Decoder not yet implemented."
        return 1
      else
        puts usage_str
        return 1
      end
    end
    run_server(uri, proto, msg, datum)
  when "rpcsend"
    usage_str = "Usage: #{$0} rpcsend uri protocol_file "
    usage_str += "message_name (-data d | -file f)"
    unless [4,6].include?(ARGV.size)
      puts usage_str
      return 1
    end
    uri, proto, msg = ARGV[1,3]
    datum = nil
    if ARGV.size > 4
      case ARGV[4]
      when "-file"
        Avro::DataFile.open(ARGV[5]){|f| f.each{|d| datum = d; break } }
      when "-data"
        puts "JSON Decoder not yet implemented"
        return 1
      else
        puts usage_str
        return 1
      end
    end
    send_message(uri, proto, msg, datum)
  end
  return 0
end

if __FILE__ == $0
  exit(main)
end
