# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import subprocess
import sys
import time
import unittest
import os

import set_avro_test_path

class TestTetherWordCount(unittest.TestCase):
  """ unittest for a python tethered map-reduce job.
  """

  def _write_lines(self,lines,fname):
    """
    Write the lines to an avro file named fname

    Parameters
    --------------------------------------------------------
    lines - list of strings to write
    fname - the name of the file to write to.
    """
    import avro.io as avio
    from avro.datafile import DataFileReader,DataFileWriter
    from avro import schema

    #recursively make all directories
    dparts=fname.split(os.sep)[:-1]
    for i in range(len(dparts)):
      pdir=os.sep+os.sep.join(dparts[:i+1])
      if not(os.path.exists(pdir)):
        os.mkdir(pdir)


    with file(fname,'w') as hf:
      inschema="""{"type":"string"}"""
      writer=DataFileWriter(hf,avio.DatumWriter(inschema),writers_schema=schema.parse(inschema))

      #encoder = avio.BinaryEncoder(writer)
      #datum_writer = avio.DatumWriter()
      for datum in lines:
        writer.append(datum)

      writer.close()




  def _count_words(self,lines):
    """Return a dictionary counting the words in lines
    """
    counts={}

    for line in lines:
      words=line.split()

      for w in words:
        if not(counts.has_key(w.strip())):
          counts[w.strip()]=0

        counts[w.strip()]=counts[w.strip()]+1

    return counts

  def test1(self):
    """
    Run a tethered map-reduce job.

    Assumptions: 1) bash is available in /bin/bash
    """
    from word_count_task import WordCountTask
    from avro.tether import tether_task_runner
    from avro.datafile import DataFileReader
    from avro.io import DatumReader
    import avro

    import subprocess
    import StringIO
    import shutil
    import tempfile
    import inspect

    proc=None

    try:


      # TODO we use the tempfile module to generate random names
      # for the files
      base_dir = "/tmp/test_tether_word_count"
      if os.path.exists(base_dir):
        shutil.rmtree(base_dir)

      inpath = os.path.join(base_dir, "in")
      infile=os.path.join(inpath, "lines.avro")
      lines=["the quick brown fox jumps over the lazy dog",
             "the cow jumps over the moon",
             "the rain in spain falls mainly on the plains"]

      self._write_lines(lines,infile)

      true_counts=self._count_words(lines)

      if not(os.path.exists(infile)):
        self.fail("Missing the input file {0}".format(infile))


      # The schema for the output of the mapper and reducer
      oschema="""
{"type":"record",
 "name":"Pair","namespace":"org.apache.avro.mapred","fields":[
     {"name":"key","type":"string"},
     {"name":"value","type":"long","order":"ignore"}
 ]
}
"""

      # write the schema to a temporary file
      osfile=tempfile.NamedTemporaryFile(mode='w',suffix=".avsc",prefix="wordcount",delete=False)
      outschema=osfile.name
      osfile.write(oschema)
      osfile.close()

      if not(os.path.exists(outschema)):
        self.fail("Missing the schema file")

      outpath = os.path.join(base_dir, "out")

      args=[]

      args.append("java")
      args.append("-jar")
      args.append(os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar"))


      args.append("tether")
      args.extend(["--in",inpath])
      args.extend(["--out",outpath])
      args.extend(["--outschema",outschema])
      args.extend(["--protocol","http"])

      # form the arguments for the subprocess
      subargs=[]

      srcfile=inspect.getsourcefile(tether_task_runner)

      # Create a shell script to act as the program we want to execute
      # We do this so we can set the python path appropriately
      script="""#!/bin/bash
export PYTHONPATH={0}
python -m avro.tether.tether_task_runner word_count_task.WordCountTask
"""
      # We need to make sure avro is on the path
      # getsourcefile(avro) returns .../avro/__init__.py
      asrc=inspect.getsourcefile(avro)
      apath=asrc.rsplit(os.sep,2)[0]

      # path to where the tests lie
      tpath=os.path.split(__file__)[0]

      exhf=tempfile.NamedTemporaryFile(mode='w',prefix="exec_word_count_",delete=False)
      exfile=exhf.name
      exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
      exhf.close()

      # make it world executable
      os.chmod(exfile,0755)

      args.extend(["--program",exfile])

      print "Command:\n\t{0}".format(" ".join(args))
      proc=subprocess.Popen(args)


      proc.wait()
      time.sleep(1) # wait a bit longer to clean up

      # read the output
      with file(os.path.join(outpath,"part-00000.avro")) as hf:
        reader=DataFileReader(hf, DatumReader())
        for record in reader:
          self.assertEqual(record["value"],true_counts[record["key"]])

        reader.close()

    except Exception as e:
      raise
    finally:
      # close the process
      if proc is not None and proc.returncode is None:
        proc.kill()
      if os.path.exists(base_dir):
        shutil.rmtree(base_dir)
      if os.path.exists(exfile):
        os.remove(exfile)

if __name__== "__main__":
  unittest.main()
