import array
import io
import json
import os
import shutil
import subprocess
import tempfile
from collections import OrderedDict
from contextlib import closing

import numpy as np
import pandas as pd

try:
    import bbi
except ImportError:
    bbi = None

try:
    import pyBigWig
except ImportError:
    pyBigWig = None

from ..core.arrops import argnatsort
from ..core.stringops import parse_region
from .schemas import BAM_FIELDS, SCHEMAS

__all__ = [
    "read_table",
    "read_chromsizes",
    "read_tabix",
    "read_pairix",
    "read_alignments",
    "load_fasta",
    "read_bigwig",
    "to_bigwig",
    "read_bigbed",
    "to_bigbed",
]


def read_table(filepath_or, schema=None, schema_is_strict=False, **kwargs):
    """
    Read a tab-delimited file into a data frame.

    Equivalent to :func:`pandas.read_table` but supports an additional
    `schema` argument to populate column names for common genomic formats.

    Parameters
    ----------
    filepath_or : str, path object or file-like object
        Any valid string path is acceptable. The string could be a URL
    schema : str
        Schema to use for table column names.
    schema_is_strict : bool
        Whether to check if columns are filled with NAs.

    Returns
    -------
    df : pandas.DataFrame of intervals

    """
    kwargs.setdefault("sep", "\t")
    kwargs.setdefault("header", None)
    kwargs.setdefault("index_col", False)
    if isinstance(filepath_or, str) and filepath_or.endswith(".gz"):
        kwargs.setdefault("compression", "gzip")
    if schema is not None:
        try:
            kwargs.setdefault("names", SCHEMAS[schema])
        except (KeyError, TypeError):
            if isinstance(schema, str):
                raise ValueError(f"TSV schema not found: '{schema}'") from None
            kwargs.setdefault("names", schema)
    df = pd.read_csv(filepath_or, **kwargs)
    if schema_is_strict:
        if (df.notna().sum(axis=0) == 0).any():
            raise ValueError(
                "one or more columns are all NA,"
                + " check agreement between number of fields in schema"
                + " and number of columns in input file"
            )
    return df


def read_chromsizes(
    filepath_or,
    filter_chroms=True,
    chrom_patterns=(r"^chr[0-9]+$", r"^chr[XY]$", r"^chrM$"),
    natsort=True,
    as_bed=False,
    **kwargs,
):
    """
    Read a ``<db>.chrom.sizes`` or ``<db>.chromInfo.txt`` file from the UCSC
    database, where ``db`` is a genome assembly name, as a `pandas.Series`.

    Parameters
    ----------
    filepath_or : str or file-like
        Path or url to text file, or buffer.
    filter_chroms : bool, optional
        Filter for chromosome names given in ``chrom_patterns``.
    chrom_patterns : sequence, optional
        Sequence of regular expressions to capture desired sequence names.
    natsort : bool, optional
        Sort each captured group of names in natural order. Default is True.
    as_bed : bool, optional
        If True, return chromsizes as an interval dataframe (chrom, start, end).
    **kwargs :
        Passed to :func:`pandas.read_csv`

    Returns
    -------
    Series of integer bp lengths indexed by sequence name or an interval dataframe.

    Notes
    -----
    Mention name patterns

    See also
    --------
    * UCSC assembly terminology: <http://genome.ucsc.edu/FAQ/FAQdownloads.html#download9>
    * NCBI assembly terminology: <https://www.ncbi.nlm.nih.gov/grc/help/definitions>

    """
    if isinstance(filepath_or, str) and filepath_or.endswith(".gz"):
        kwargs.setdefault("compression", "gzip")

    chromtable = pd.read_csv(
        filepath_or,
        sep="\t",
        usecols=[0, 1],
        names=["name", "length"],
        dtype={"name": str},
        **kwargs,
    )

    if filter_chroms:
        parts = []
        for pattern in chrom_patterns:
            if not len(pattern):
                continue
            part = chromtable[chromtable["name"].str.contains(pattern)]
            if natsort:
                part = part.iloc[argnatsort(part["name"])]
            parts.append(part)
        chromtable = pd.concat(parts, axis=0)

    if as_bed:
        chromtable["start"] = 0
        chromtable = (
            chromtable[["name", "start", "length"]]
            .rename({"name": "chrom", "length": "end"}, axis="columns")
            .reset_index(drop=True)
        )
    else:
        chromtable.index = chromtable["name"].values
        chromtable = chromtable["length"]
    return chromtable


def read_tabix(fp, chrom=None, start=None, end=None):
    """
    Read a tabix-indexed file into dataFrame.
    """
    try:
        import pysam
    except ImportError:
        raise ImportError("pysam is required to use `read_tabix`") from None

    with closing(pysam.TabixFile(fp)) as f:
        names = list(f.header) or None
        df = pd.read_csv(
            io.StringIO("\n".join(f.fetch(chrom, start, end))),
            sep="\t",
            header=None,
            names=names,
        )
    return df


def read_pairix(
    fp,
    region1,
    region2=None,
    chromsizes=None,
    columns=None,
    usecols=None,
    dtypes=None,
    **kwargs,
):
    """
    Read a pairix-indexed file into DataFrame.
    """
    import cytoolz as toolz
    import pypairix

    if dtypes is None:
        dtypes = {}
    f = pypairix.open(fp, "r")

    header = f.get_header()
    if len(header):
        header_groups = toolz.groupby(lambda x: x.split(":")[0], header)
        if "#chromsize" in header_groups and chromsizes is None:
            items = [line.split()[1:] for line in header_groups["#chromsize"]]
            if len(items) and chromsizes is None:
                names, lengths = zip(*((item[0], int(item[1])) for item in items))
                chromsizes = pd.Series(index=names, data=lengths)
        if "#columns" in header_groups and columns is None:
            columns = header_groups["#columns"][0].split()[1:]

    chrom1, start1, end1 = parse_region(region1, chromsizes)
    if region2 is not None:
        chrom2, start2, end2 = parse_region(region2, chromsizes)
    else:
        chrom2, start2, end2 = chrom1, start1, end1

    it = f.query2D(chrom1, start1, end1, chrom2, start2, end2)
    if usecols is not None:
        argusecols = [columns.index(col) for col in usecols]
        records = [(record[i] for i in argusecols) for record in it]
        columns = usecols
    else:
        records = it

    df = pd.DataFrame.from_records(records, columns=columns)
    if columns is not None:
        for col in columns:
            if col in dtypes:
                df[col] = df[col].astype(dtypes[col])
            else:
                df[col] = pd.to_numeric(df[col], "ignore")
    return df


def read_alignments(fp, chrom=None, start=None, end=None):
    """
    Read alignment records into a DataFrame.
    """
    try:
        import pysam
    except ImportError:
        raise ImportError("pysam is required to use `read_alignments`") from None

    ext = os.path.splitext(fp)[1]
    if ext == ".sam":
        mode = "r"
    elif ext == ".bam":
        mode = "rb"
    elif ext == ".cram":
        mode = "rc"
    else:
        raise ValueError(f"{ext} is not a supported filetype")

    with closing(pysam.AlignmentFile(fp, mode)) as f:
        records = []
        for s in f.fetch(chrom, start, end):
            # Needed because array.array is not json serializable
            tags = [
                (k, v.tolist() if isinstance(v, array.array) else v) for k, v in s.tags
            ]
            records.append(
                (
                    s.qname,
                    s.flag,
                    s.reference_name,
                    s.pos,
                    s.mapq,
                    s.cigarstring if s.mapq != 0 else np.nan,
                    s.rnext,
                    s.pnext,
                    s.tlen,
                    s.seq,
                    s.qual,
                    json.dumps(dict(tags)),
                )
            )
        df = pd.DataFrame(records, columns=BAM_FIELDS)
    return df


def read_bam(fp, chrom=None, start=None, end=None):
    """
    Deprecated: use `read_alignment` instead.
    Read bam file into dataframe,
    """
    return read_alignments(fp, chrom, start, end)


class PysamFastaRecord:
    def __init__(self, ff, ref):
        self.ff = ff
        if ref not in ff.references:
            raise KeyError(f"Reference name '{ref}' not found in '{ff}'")
        self.ref = ref

    def __getitem__(self, key):
        if isinstance(key, slice):
            start, stop = key.start, key.stop
        else:
            start = key
            stop = key + 1
        return self.ff.fetch(self.ref, start, stop)


def load_fasta(filepath_or, engine="pysam", **kwargs):
    """
    Load lazy fasta sequences from an indexed fasta file (optionally compressed)
    or from a collection of uncompressed fasta files.

    Parameters
    ----------
    filepath_or : str or iterable
        If a string, a filepath to a single `.fa` or `.fa.gz` file. Assumed to
        be accompanied by a `.fai` index file. Depending on the engine, the
        index may be created on the fly, and some compression formats may not
        be supported. If not a string, an iterable of fasta file paths each
        assumed to contain a single sequence.
    engine : {'pysam', 'pyfaidx'}, optional
        Module to use for loading sequences.
    kwargs : optional
        Options to pass to ``pysam.FastaFile`` or ``pyfaidx.Fasta``.

    Returns
    -------
    OrderedDict of (lazy) fasta records.

    Notes
    -----
    * pysam/samtools can read .fai and .gzi indexed files, I think.
    * pyfaidx can handle uncompressed and bgzf compressed files.

    """
    is_multifile = not isinstance(filepath_or, str)
    records = OrderedDict()

    engine = engine.lower()

    if engine == "pysam":
        try:
            import pysam
        except ImportError:
            raise ImportError("pysam is required to use engine='pysam'") from None

        if is_multifile:
            for onefile in filepath_or:
                ff = pysam.FastaFile(onefile, **kwargs)
                name = ff.references[0]
                records[name] = PysamFastaRecord(ff, name)
        else:
            ff = pysam.FastaFile(filepath_or, **kwargs)
            for name in ff.references:
                records[name] = PysamFastaRecord(ff, name)

    elif engine == "pyfaidx":
        try:
            import pyfaidx
        except ImportError:
            raise ImportError("pyfaidx is required to use engine='pyfaidx'") from None

        if is_multifile:
            for onefile in filepath_or:
                ff = pyfaidx.Fasta(onefile, **kwargs)
                name = next(iter(ff.keys()))
                records[name] = ff[name]
        else:
            ff = pyfaidx.Fasta(filepath_or, **kwargs)
            for name in ff.keys():
                records[name] = ff[name]

    else:
        raise ValueError("engine must be 'pysam' or 'pyfaidx'")

    return records


def read_bigwig(path, chrom, start=None, end=None, engine="auto"):
    """
    Read intervals from a bigWig file.

    Parameters
    ----------
    path : str
        Path or URL to a bigWig file
    chrom : str
    start, end : int, optional
        Start and end coordinates. Defaults to 0 and chromosome length.
    engine : {"auto", "pybbi", "pybigwig"}
        Library to use for querying the bigWig file.

    Returns
    -------
    DataFrame

    """
    engine = engine.lower()

    if engine == "auto":
        if bbi is None and pyBigWig is None:
            raise ImportError(
                "read_bigwig requires either the pybbi or pyBigWig package"
            )
        elif bbi is not None:
            engine = "pybbi"
        else:
            engine = "pybigwig"

    if engine in ("pybbi", "bbi"):
        if start is None:
            start = 0
        if end is None:
            end = -1
        with bbi.open(path) as f:
            df = f.fetch_intervals(chrom, start=start, end=end)

    elif engine == "pybigwig":
        f = pyBigWig.open(path)
        if start is None:
            start = 0
        if end is None:
            end = f.chroms()[chrom]
        ivals = f.intervals(chrom, start, end)
        df = pd.DataFrame(ivals, columns=["start", "end", "value"])
        df.insert(0, "chrom", chrom)

    else:
        raise ValueError(f"engine must be 'auto', 'pybbi' or 'pybigwig'; got {engine}")

    return df


def read_bigbed(path, chrom, start=None, end=None, engine="auto"):
    """
    Read intervals from a bigBed file.

    Parameters
    ----------
    path : str
        Path or URL to a bigBed file
    chrom : str
    start, end : int, optional
        Start and end coordinates. Defaults to 0 and chromosome length.
    engine : {"auto", "pybbi", "pybigwig"}
        Library to use for querying the bigBed file.

    Returns
    -------
    DataFrame

    """
    engine = engine.lower()

    if engine == "auto":
        if bbi is None and pyBigWig is None:
            raise ImportError(
                "read_bigbed requires either the pybbi or pyBigWig package"
            )
        elif bbi is not None:
            engine = "pybbi"
        else:
            engine = "pybigwig"

    if engine in ("pybbi", "bbi"):
        if start is None:
            start = 0
        if end is None:
            end = -1
        with bbi.open(path) as f:
            df = f.fetch_intervals(chrom, start=start, end=end)

    elif engine == "pybigwig":
        f = pyBigWig.open(path)
        if start is None:
            start = 0
        if end is None:
            end = f.chroms()[chrom]
        ivals = f.entries(chrom, start, end)
        df = pd.DataFrame(ivals, columns=["start", "end", "rest"])
        df.insert(0, "chrom", chrom)

    else:
        raise ValueError(f"engine must be 'auto', 'pybbi' or 'pybigwig'; got {engine}")

    return df


def _find_ucsc_binary(path, cmd):
    if path is None:
        try:
            assert shutil.which(cmd) is not None
        except Exception:
            raise ValueError(
                f"{cmd} is not present in the current environment. "
                f"Pass it as 'path_to_binary' parameter to bioframe.to_bigwig or "
                f"install it with, for example, conda install -y -c bioconda "
                f"ucsc-{cmd.lower()} "
            ) from None
    elif path.endswith(cmd):
        if not os.path.isfile(path) and os.access(path, os.X_OK):
            raise ValueError(
                f"{cmd} is absent in the provided path or cannot be "
                f"executed: {path}. "
            )
        cmd = path
    else:
        cmd = os.path.join(path, cmd)
        if not os.path.isfile(cmd) and os.access(cmd, os.X_OK):
            raise ValueError(
                f"{cmd} is absent in the provided path or cannot be "
                f"executed: {path}. "
            )
    return cmd


def to_bigwig(
    df, chromsizes, outpath, value_field=None, engine="ucsc", path_to_binary=None
):
    """Save a bedGraph-like dataframe as a binary BigWig file.

    Parameters
    ----------
    df : pandas.DataFrame
        Data frame with columns 'chrom', 'start', 'end' and one or more value
        columns
    chromsizes : pandas.Series
        Series indexed by chromosome name mapping to their lengths in bp
    outpath : str
        The output BigWig file path
    value_field : str, optional
        Select the column label of the data frame to generate the track. Default
        is to use the fourth column.
    path_to_binary : str, optional
        Provide system path to the bedGraphToBigWig binary.
    engine : {'ucsc', 'bigtools'}, optional
        Engine to use for creating the BigWig file.
    """

    is_bedgraph = True
    for col in ["chrom", "start", "end"]:
        if col not in df.columns:
            is_bedgraph = False
    if len(df.columns) < 4:
        is_bedgraph = False

    if not is_bedgraph:
        raise ValueError(f"A bedGraph-like DataFrame is required, got {df.columns}")

    if value_field is None:
        value_field = df.columns[3]

    columns = ["chrom", "start", "end", value_field]
    bg = df[columns].copy()
    bg["chrom"] = bg["chrom"].astype(str)
    bg = bg.sort_values(["chrom", "start", "end"])

    if engine.lower() == "ucsc":
        cmd = _find_ucsc_binary(path_to_binary, "bedGraphToBigWig")

        with tempfile.NamedTemporaryFile(suffix=".bg") as f, \
             tempfile.NamedTemporaryFile("wt", suffix=".chrom.sizes") as cs:  # fmt: skip # noqa: E501
            pd.Series(chromsizes).to_csv(cs, sep="\t", header=False)
            cs.flush()

            bg.to_csv(
                f.name,
                sep="\t",
                columns=columns,
                index=False,
                header=False,
                na_rep="nan",
            )

            p = subprocess.run(
                [cmd, f.name, cs.name, outpath],
                capture_output=True,
            )
        return p

    elif engine.lower() == "bigtools":
        try:
            import pybigtools
        except ImportError:
            raise ImportError(
                "pybigtools is required to use engine='bigtools'"
            ) from None

        f = pybigtools.open(outpath, "w")
        if issubclass(type(chromsizes), pd.Series):
            chromsizes = chromsizes.astype(int).to_dict()

        bg = bg.astype({"chrom": str, "start": int, "end": int, value_field: float})
        f.write(chroms=chromsizes, vals=bg.itertuples(index=False))
        f.close()


def to_bigbed(
    df, chromsizes, outpath, schema="infer", engine="ucsc", path_to_binary=None
):
    """Save a BED-like dataframe as a binary BigBed file.

    Parameters
    ----------
    df : pandas.DataFrame
        Data frame with columns 'chrom', 'start', 'end' and one or more value
        columns
    chromsizes : pandas.Series
        Series indexed by chromosome name mapping to their lengths in bp
    outpath : str
        The output BigWig file path
    value_field : str, optional
        Select the column label of the data frame to generate the track. Default
        is to use the fourth column.
    path_to_binary : str, optional
        Provide system path to the bedToBigBed binary.
    """
    from bioframe.io.bed import infer_bed_schema, parse_bed_schema, to_bed_dataframe

    if schema == "infer":
        n, _ = infer_bed_schema(df)
    else:
        n, _ = parse_bed_schema(schema)

    bed = to_bed_dataframe(df, schema=schema)
    m = len(bed.columns) - n
    schema = f"bed{n}+{m}" if m > 0 else f"bed{n}"

    if engine.lower() == "ucsc":
        if path_to_binary is None:
            cmd = _find_ucsc_binary(path_to_binary, "bedToBigBed")

        with tempfile.NamedTemporaryFile(suffix=".bed") as f, \
             tempfile.NamedTemporaryFile("wt", suffix=".chrom.sizes") as cs:  # fmt: skip # noqa: E501
            pd.Series(chromsizes).to_csv(cs, sep="\t", header=False)
            cs.flush()

            bed.to_csv(
                f.name,
                sep="\t",
                columns=bed.columns,
                index=False,
                header=False,
                na_rep="nan",
            )

            p = subprocess.run(
                [cmd, f"-type={schema}", f.name, cs.name, outpath],
                capture_output=True,
            )
        return p

    elif engine.lower() == "bigtools":
        try:
            import pybigtools
        except ImportError:
            raise ImportError(
                "pybigtools is required to use engine='bigtools'"
            ) from None

        f = pybigtools.open(outpath, "w")
        if issubclass(type(chromsizes), pd.Series):
            chromsizes = chromsizes.astype(int).to_dict()

        bed = bed.astype({"chrom": str, "start": int, "end": int})
        record_iter = (
            (row[0], row[1], row[2], "\t".join(str(x) for x in row[3:]))
            for row in bed.itertuples(index=False)
        )
        f.write(chroms=chromsizes, vals=record_iter)
        f.close()
