#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#ifdef HAVE_LIBGRIB

#ifdef HAVE_LIBFDB5
#include "cdi_fdb.h"
#endif

#include "async_worker.h"
#include "dmemory.h"
#include "cdi.h"
#include "cdi_int.h"
#include "stream_cgribex.h"
#include "stream_grb.h"
#include "stream_gribapi.h"
#include "file.h"
#include "cgribex.h" /* gribZip gribGetZip gribGinfo */

static int
grb_decode(int filetype, int memType, int datatype, void *cgribexp, void *gribbuffer, size_t gribsize, void *data, size_t datasize,
           int unreduced, size_t *numMissVals, double missval)
{
  int status = 0;

#ifdef HAVE_LIBCGRIBEX
  if (filetype == CDI_FILETYPE_GRB && !CDI_gribapi_grib1)
  {
#ifdef HAVE_LIBGRIB_API
    extern int cdiNAdditionalGRIBKeys;
    if (cdiNAdditionalGRIBKeys > 0) Error("CGRIBEX decode does not support reading of additional GRIB keys!");
#endif
    status = cgribexDecode(memType, cgribexp, gribbuffer, gribsize, data, datasize, unreduced, numMissVals, missval);
  }
  else
#endif
#ifdef HAVE_LIBGRIB_API
  {
    bool useFloatInterface = (have_gribapi_float_interface() && datatype != CDI_DATATYPE_FLT32 && datatype != CDI_DATATYPE_FLT64);
    int memTypeX = useFloatInterface ? memType : MEMTYPE_DOUBLE;
    void *datap = (!useFloatInterface && memType == MEMTYPE_FLOAT) ? Malloc(datasize * sizeof(double)) : data;

    // if (useFloatInterface) printf("gribapi read: useFloatInterface\n");

    status = gribapiDecode(memTypeX, gribbuffer, gribsize, datap, datasize, unreduced, numMissVals, missval);

    if (!useFloatInterface && memType == MEMTYPE_FLOAT)
    {
      // printf("gribapi read: convert double to float\n");
      float *dataf = (float *) data;
      double *datad = (double *) datap;
      for (size_t i = 0; i < datasize; ++i) dataf[i] = (float) datad[i];
      Free(datap);
    }
  }
#else
  {
    (void) datatype;
    Error("ecCodes support not compiled in!");
  }
#endif

  return status;
}

// Decompresses the grib data in gribbuffer.
static int
grib1_unzip_record(void *gribbuffer, size_t *gribsize)
{
  int zip = 0;

  size_t igribsize = *gribsize;
  size_t ogribsize = *gribsize;

  int izip;
  size_t unzipsize;
  if ((izip = gribGetZip(igribsize, (unsigned char *) gribbuffer, &unzipsize)) > 0)
  {
    zip = izip;
    if (izip == 128)  // szip
    {
      if (unzipsize < igribsize)
      {
        fprintf(stderr, "Decompressed size smaller than compressed size (in %zu; out %zu)!\n", igribsize, unzipsize);
        return 0;
      }

      unzipsize += 100;  // need 0 to 1 bytes for rounding of bds

      void *buffer = Malloc(igribsize);
      memcpy(buffer, gribbuffer, igribsize);

      ogribsize = (size_t) gribUnzip((unsigned char *) gribbuffer, (long) unzipsize, (unsigned char *) buffer, (long) igribsize);

      Free(buffer);

      if (ogribsize == 0) Error("Decompression problem!");
    }
    else
    {
      Error("Decompression for %d not implemented!", izip);
    }
  }

  *gribsize = ogribsize;

  return zip;
}

typedef struct JobArgsGRB
{
  int *outZip;
  int recID, tsID, filetype, memType, datatype, unreduced;
  void *cgribexp, *gribbuffer, *data;
  size_t recsize, gridsize, numMissVals;
  double missval;
} JobArgsGRB;

static int
grb_decode_record(void *untypedArgs)
{
  JobArgsGRB *args = (JobArgsGRB *) untypedArgs;
  *args->outZip = grib1_unzip_record(args->gribbuffer, &args->recsize);
  grb_decode(args->filetype, args->memType, args->datatype, args->cgribexp, args->gribbuffer, args->recsize, args->data,
             args->gridsize, args->unreduced, &args->numMissVals, args->missval);
  return 0;
}

static JobArgsGRB
grb_read_raw_data(stream_t *streamptr, int tsID, int recID, int memType, void *gribbuffer, void *data, bool resetFilePos)
{
  int vlistID = streamptr->vlistID;
  int varID = streamptr->tsteps[tsID].recinfo[recID].varID;
  size_t recsize = streamptr->tsteps[tsID].records[recID].size;

  int gridID = vlistInqVarGrid(vlistID, varID);
  size_t gridsize = (size_t) gridInqSize(gridID);
  if (CDI_Debug) Message("gridID = %d gridsize = %zu", gridID, gridsize);

  void *cgribexp = (gribbuffer && streamptr->record->objectp) ? streamptr->record->objectp : NULL;
  if (!gribbuffer) gribbuffer = Malloc(streamptr->record->buffersize);
  if (!data) data = Malloc(gridsize * ((memType == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));

  if (streamptr->protocol == CDI_PROTOCOL_FDB)
  {
#ifdef HAVE_LIBFDB5
    int fdbItemIndex = streamptr->tsteps[tsID].records[recID].fdbItemIndex;
    if (fdbItemIndex == -1) Error("fdbItem not available!");

    recsize = cdi_fdb_read_record(streamptr->protocolData, &(streamptr->fdbKeyValueList[fdbItemIndex]),
                                  &(streamptr->record->buffersize), &gribbuffer);
#endif
  }
  else
  {
    if (recsize == 0) Error("Internal problem! Recordsize is zero for record %d at timestep %d", recID + 1, tsID + 1);

    int fileID = streamptr->fileID;
    off_t recpos = streamptr->tsteps[tsID].records[recID].position;
    off_t currentfilepos = (resetFilePos ? fileGetPos(fileID) : 0);

    fileSetPos(fileID, recpos, SEEK_SET);
    if (fileRead(fileID, gribbuffer, recsize) != recsize) Error("Failed to read GRIB record!");

    if (resetFilePos) fileSetPos(fileID, currentfilepos, SEEK_SET);
    if (!resetFilePos) streamptr->numvals += (SizeType) gridsize;
  }

  return (JobArgsGRB) {
    .outZip = &streamptr->tsteps[tsID].records[recID].zip,
    .recID = recID,
    .tsID = tsID,
    .filetype = streamptr->filetype,
    .memType = memType,
    .datatype = vlistInqVarDatatype(vlistID, varID),
    .unreduced = streamptr->unreduced,
    .cgribexp = cgribexp,
    .gribbuffer = gribbuffer,
    .data = data,
    .recsize = recsize,
    .gridsize = gridsize,
    .numMissVals = 0,
    .missval = vlistInqVarMissval(vlistID, varID),
  };
}

static size_t
grb_read_and_decode_record(stream_t *streamptr, int recID, int memType, void *data, bool resetFilePos)
{
  JobArgsGRB args = grb_read_raw_data(streamptr, streamptr->curTsID, recID, memType, streamptr->record->buffer, data, resetFilePos);
  grb_decode_record(&args);
  return args.numMissVals;
}

typedef struct JobDescriptorGRB
{
  JobArgsGRB args;
  AsyncJob *job;
} JobDescriptorGRB;

static void
JobDescriptor_startJob(AsyncManager *jobManager, JobDescriptorGRB *me, stream_t *streamptr, int tsID, int recID, int memType)
{
  me->args = grb_read_raw_data(streamptr, tsID, recID, memType, NULL, NULL, false);
  me->job = AsyncWorker_requestWork(jobManager, grb_decode_record, &me->args);
  if (!me->job) xabort("error while trying to send job to worker thread");
}

static void
grb_JobDescriptor_finishJob(AsyncManager *jobManager, JobDescriptorGRB *me, void *data, size_t *numMissVals)
{
  if (AsyncWorker_wait(jobManager, me->job)) xabort("error executing job in worker thread");
  memcpy(data, me->args.data, me->args.gridsize * ((me->args.memType == MEMTYPE_FLOAT) ? sizeof(float) : sizeof(double)));
  *numMissVals = me->args.numMissVals;

  Free(me->args.gribbuffer);
  Free(me->args.data);
  me->args.recID = -1;  // mark as inactive
  me->args.tsID = -1;   // mark as inactive
}
/*
static long
get_global_recId(stream_t *streamptr, int tsID, int recID)
{
  const tsteps_t *tsteps = streamptr->tsteps;
  long globalRecId = recID;
  if (tsID > 0) globalRecId += tsteps[0].nrecs;
  if (tsID > 1) globalRecId += tsteps[1].nrecs * (tsID - 1);
  return globalRecId;
}
*/

static void
grb_get_local_step_and_recId(stream_t *streamptr, long globalRecId, int *tsID, long *recID)
{
  int localTsId = 0;
  long numSteps = streamptr->ntsteps;
  const tsteps_t *tsteps = streamptr->tsteps;
  if (numSteps > 0 && globalRecId >= tsteps[0].nrecs)
  {
    localTsId++;
    globalRecId -= tsteps[0].nrecs;
  }
  if (globalRecId >= tsteps[1].nrecs)
  {
    ldiv_t nadjust = ldiv(globalRecId, tsteps[1].nrecs);
    localTsId += (int) nadjust.quot;
    globalRecId = nadjust.rem;
  }

  *tsID = localTsId;
  *recID = globalRecId;
}

static void
grb_read_next_record_async(AsyncManager *jobManager, JobDescriptorGRB *jd, stream_t *streamptr, int memType)
{
  int tsId = -1;
  long recId = -1;
  grb_get_local_step_and_recId(streamptr, streamptr->nextGlobalRecId, &tsId, &recId);
  int xRecId = streamptr->tsteps[tsId].recIDs[recId];
  JobDescriptor_startJob(jobManager, jd, streamptr, tsId, xRecId, memType);
  streamptr->nextGlobalRecId++;
}

static void
grb_read_next_record(stream_t *streamptr, int recID, int memType, void *data, size_t *numMissVals)
{
  bool jobFound = false;

  int workerCount = streamptr->numWorker;
  if (workerCount > 0)
  {
    int tsID = streamptr->curTsID;

    AsyncManager *jobManager = (AsyncManager *) streamptr->jobManager;
    JobDescriptorGRB *jobs = (JobDescriptorGRB *) streamptr->jobs;

    // if this is the first call, init and start worker threads
    if (!jobs)
    {
      jobs = (JobDescriptorGRB *) Malloc((size_t) workerCount * sizeof(*jobs));
      streamptr->jobs = jobs;
      for (int i = 0; i < workerCount; i++) jobs[i].args.recID = -1;
      for (int i = 0; i < workerCount; i++) jobs[i].args.tsID = -1;
      if (AsyncWorker_init(&jobManager, workerCount)) xabort("error while trying to start worker threads");
      streamptr->jobManager = jobManager;

      // Start as many new jobs as possible.
      for (int i = 0; streamptr->nextGlobalRecId < streamptr->maxGlobalRecs && i < workerCount; i++)
        if (jobs[i].args.recID < 0 && jobs[i].args.tsID < 0) grb_read_next_record_async(jobManager, jobs + i, streamptr, memType);
    }

    // search for a job descriptor with the given tsID and recID, and use its results if it exists
    for (int i = 0; !jobFound && i < workerCount; i++)
      if (jobs[i].args.recID == recID && jobs[i].args.tsID == tsID)
      {
        jobFound = true;
        grb_JobDescriptor_finishJob(jobManager, jobs + i, data, numMissVals);
        if (streamptr->nextGlobalRecId < streamptr->maxGlobalRecs)
          grb_read_next_record_async(jobManager, jobs + i, streamptr, memType);
      }
  }

  // perform the work synchronously if we didn't start a job for it yet
  if (!jobFound) *numMissVals = grb_read_and_decode_record(streamptr, recID, memType, data, false);
}

void
grb_read_field(stream_t *streamptr, int memType, void *data, size_t *numMissVals)
{
  int tsID = streamptr->curTsID;
  int vrecID = streamptr->tsteps[tsID].curRecID;
  int recID = streamptr->tsteps[tsID].recIDs[vrecID];

  grb_read_next_record(streamptr, recID, memType, data, numMissVals);
}

void
grb_read_var_slice(stream_t *streamptr, int varID, int levelID, int memType, void *data, size_t *numMissVals)
{
  int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
  int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];

  *numMissVals = grb_read_and_decode_record(streamptr, recID, memType, data, true);
}

void
grb_read_var(stream_t *streamptr, int varID, int memType, void *data, size_t *numMissVals)
{
  int vlistID = streamptr->vlistID;
  int fileID = streamptr->fileID;

  int gridID = vlistInqVarGrid(vlistID, varID);
  size_t gridsize = (size_t) gridInqSize(gridID);

  off_t currentfilepos = fileGetPos(fileID);

  int isub = subtypeInqActiveIndex(streamptr->vars[varID].subtypeID);
  int nlevs = streamptr->vars[varID].recordTable[0].nlevs;

  if (CDI_Debug) Message("nlevs = %d gridID = %d gridsize = %zu", nlevs, gridID, gridsize);

  *numMissVals = 0;
  for (int levelID = 0; levelID < nlevs; levelID++)
  {
    int recID = streamptr->vars[varID].recordTable[isub].recordID[levelID];
    size_t offset = (size_t) levelID * gridsize;
    void *datap = (memType == MEMTYPE_FLOAT) ? (void *) ((float *) data + offset) : (void *) ((double *) data + offset);

    *numMissVals += grb_read_and_decode_record(streamptr, recID, memType, datap, false);
  }

  fileSetPos(fileID, currentfilepos, SEEK_SET);
}

#endif

/*
 * Local Variables:
 * c-file-style: "Java"
 * c-basic-offset: 2
 * indent-tabs-mode: nil
 * show-trailing-whitespace: t
 * require-trailing-newline: t
 * End:
 */
