"""
***************************************************************************
    GdalAlgorithmTests.py
    ---------------------
    Date                 : January 2016
    Copyright            : (C) 2016 by Matthias Kuhn
    Email                : matthias@opengis.ch
***************************************************************************
*                                                                         *
*   This program is free software; you can redistribute it and/or modify  *
*   it under the terms of the GNU General Public License as published by  *
*   the Free Software Foundation; either version 2 of the License, or     *
*   (at your option) any later version.                                   *
*                                                                         *
***************************************************************************
"""

__author__ = "Matthias Kuhn"
__date__ = "January 2016"
__copyright__ = "(C) 2016, Matthias Kuhn"

import nose2
import os
import shutil
import tempfile

from qgis.core import (
    QgsProcessingContext,
    QgsProcessingFeedback,
    QgsCoordinateReferenceSystem,
    QgsApplication,
    QgsFeature,
    QgsGeometry,
    QgsPointXY,
    QgsProject,
    QgsVectorLayer,
    QgsRectangle,
    QgsProjUtils,
    QgsProcessingException,
    QgsProcessingFeatureSourceDefinition,
)

from qgis.testing import QgisTestCase, start_app

from processing.algs.gdal.GdalUtils import GdalUtils
from processing.algs.gdal.ogr2ogr import ogr2ogr
from processing.algs.gdal.OgrToPostGis import OgrToPostGis

testDataPath = os.path.join(os.path.dirname(__file__), "testdata")


class TestGdalAlgorithms(QgisTestCase):

    @classmethod
    def setUpClass(cls):
        start_app()
        from processing.core.Processing import Processing

        Processing.initialize()
        cls.cleanup_paths = []

    @classmethod
    def tearDownClass(cls):
        for path in cls.cleanup_paths:
            shutil.rmtree(path)

    def testCommandName(self):
        # Test that algorithms report a valid commandName
        p = QgsApplication.processingRegistry().providerById("gdal")
        for a in p.algorithms():
            if a.id() in ("gdal:buildvirtualvector"):
                # build virtual vector is an exception
                continue
            self.assertTrue(a.commandName(), f"Algorithm {a.id()} has no commandName!")

    def testCommandNameInTags(self):
        # Test that algorithms commandName is present in provided tags
        p = QgsApplication.processingRegistry().providerById("gdal")
        for a in p.algorithms():
            if not a.commandName():
                continue
            self.assertTrue(
                a.commandName() in a.tags(),
                f"Algorithm {a.id()} commandName not found in tags!",
            )

    def testNoParameters(self):
        # Test that algorithms throw QgsProcessingException and not base Python
        # exceptions when no parameters specified
        p = QgsApplication.processingRegistry().providerById("gdal")
        context = QgsProcessingContext()
        feedback = QgsProcessingFeedback()
        for a in p.algorithms():
            try:
                a.getConsoleCommands({}, context, feedback)
            except QgsProcessingException:
                pass

    def testGetOgrCompatibleSourceFromMemoryLayer(self):
        # create a memory layer and add to project and context
        layer = QgsVectorLayer(
            "Point?field=fldtxt:string&field=fldint:integer", "testmem", "memory"
        )
        self.assertTrue(layer.isValid())
        pr = layer.dataProvider()
        f = QgsFeature()
        f.setAttributes(["test", 123])
        f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
        f2 = QgsFeature()
        f2.setAttributes(["test2", 457])
        f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
        self.assertTrue(pr.addFeatures([f, f2]))
        self.assertEqual(layer.featureCount(), 2)
        QgsProject.instance().addMapLayer(layer)
        context = QgsProcessingContext()
        context.setProject(QgsProject.instance())

        alg = QgsApplication.processingRegistry().createAlgorithmById(
            "gdal:buffervectors"
        )
        self.assertIsNotNone(alg)
        parameters = {"INPUT": "testmem"}
        feedback = QgsProcessingFeedback()
        # check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, executing=True
        )
        self.assertTrue(input_details.connection_string)
        self.assertTrue(input_details.connection_string.endswith(".gpkg"))
        self.assertTrue(os.path.exists(input_details.connection_string))
        self.assertTrue(input_details.connection_string)

        # make sure that layer has correct features
        res = QgsVectorLayer(input_details.connection_string, "res")
        self.assertTrue(res.isValid())
        self.assertEqual(res.featureCount(), 2)

        # with memory layers - if not executing layer source should be ignored and replaced
        # with a dummy path, because:
        # - it has no meaning for the gdal command outside of QGIS, memory layers don't exist!
        # - we don't want to force an export of the whole memory layer to a temp file just to show the command preview
        # this might be very slow!
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, executing=False
        )
        self.assertEqual(input_details.connection_string, "path_to_data_file")
        self.assertEqual(input_details.layer_name, "layer_name")

        QgsProject.instance().removeMapLayer(layer)

    def testGetOgrCompatibleSourceFromOgrLayer(self):
        p = QgsProject()
        source = os.path.join(testDataPath, "points.gml")
        vl = QgsVectorLayer(source)
        self.assertTrue(vl.isValid())
        p.addMapLayer(vl)

        context = QgsProcessingContext()
        context.setProject(p)
        feedback = QgsProcessingFeedback()

        alg = ogr2ogr()
        alg.initAlgorithm()
        input_details = alg.getOgrCompatibleSource(
            "INPUT", {"INPUT": vl.id()}, context, feedback, True
        )
        self.assertEqual(input_details.connection_string, source)
        input_details = alg.getOgrCompatibleSource(
            "INPUT", {"INPUT": vl.id()}, context, feedback, False
        )
        self.assertEqual(input_details.connection_string, source)

        # with selected features only - if not executing, the 'selected features only' setting
        # should be ignored (because it has no meaning for the gdal command outside of QGIS!)
        parameters = {"INPUT": QgsProcessingFeatureSourceDefinition(vl.id(), True)}
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, False
        )
        self.assertEqual(input_details.connection_string, source)

        # with subset string
        vl.setSubsetString("x")
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, False
        )
        self.assertEqual(input_details.connection_string, source)
        # subset of layer must be exported
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, True
        )
        self.assertNotEqual(input_details.connection_string, source)
        self.assertTrue(input_details.connection_string)
        self.assertTrue(input_details.connection_string.endswith(".gpkg"))
        self.assertTrue(os.path.exists(input_details.connection_string))
        self.assertTrue(input_details.layer_name)

        # geopackage with layer
        source = os.path.join(testDataPath, "custom", "circular_strings.gpkg")
        vl2 = QgsVectorLayer(source + "|layername=circular_strings")
        self.assertTrue(vl2.isValid())
        p.addMapLayer(vl2)
        input_details = alg.getOgrCompatibleSource(
            "INPUT", {"INPUT": vl2.id()}, context, feedback, True
        )
        self.assertEqual(input_details.connection_string, source)
        self.assertEqual(input_details.layer_name, "circular_strings")
        vl3 = QgsVectorLayer(source + "|layername=circular_strings_with_line")
        self.assertTrue(vl3.isValid())
        p.addMapLayer(vl3)
        input_details = alg.getOgrCompatibleSource(
            "INPUT", {"INPUT": vl3.id()}, context, feedback, True
        )
        self.assertEqual(input_details.connection_string, source)
        self.assertEqual(input_details.layer_name, "circular_strings_with_line")

    def testGetOgrCompatibleSourceFromFeatureSource(self):
        # create a memory layer and add to project and context
        layer = QgsVectorLayer(
            "Point?field=fldtxt:string&field=fldint:integer", "testmem", "memory"
        )
        self.assertTrue(layer.isValid())
        pr = layer.dataProvider()
        f = QgsFeature()
        f.setAttributes(["test", 123])
        f.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
        f2 = QgsFeature()
        f2.setAttributes(["test2", 457])
        f2.setGeometry(QgsGeometry.fromPointXY(QgsPointXY(100, 200)))
        self.assertTrue(pr.addFeatures([f, f2]))
        self.assertEqual(layer.featureCount(), 2)
        # select first feature
        layer.selectByIds([next(layer.getFeatures()).id()])
        self.assertEqual(len(layer.selectedFeatureIds()), 1)
        QgsProject.instance().addMapLayer(layer)
        context = QgsProcessingContext()
        context.setProject(QgsProject.instance())

        alg = QgsApplication.processingRegistry().createAlgorithmById(
            "gdal:buffervectors"
        )
        self.assertIsNotNone(alg)
        parameters = {"INPUT": QgsProcessingFeatureSourceDefinition("testmem", True)}
        feedback = QgsProcessingFeedback()
        # check that memory layer is automatically saved out to geopackage when required by GDAL algorithms
        input_details = alg.getOgrCompatibleSource(
            "INPUT", parameters, context, feedback, executing=True
        )
        self.assertTrue(input_details.connection_string)
        self.assertTrue(input_details.connection_string.endswith(".gpkg"))
        self.assertTrue(os.path.exists(input_details.connection_string))
        self.assertTrue(input_details.layer_name)

        # make sure that layer has only selected feature
        res = QgsVectorLayer(input_details.connection_string, "res")
        self.assertTrue(res.isValid())
        self.assertEqual(res.featureCount(), 1)

        QgsProject.instance().removeMapLayer(layer)

    def testOgrLayerNameExtraction(self):
        with tempfile.TemporaryDirectory() as outdir:

            def _copyFile(dst):
                shutil.copyfile(
                    os.path.join(testDataPath, "custom", "weighted.csv"), dst
                )

            # OGR provider - single layer
            _copyFile(os.path.join(outdir, "a.csv"))
            name = GdalUtils.ogrLayerName(outdir)
            self.assertEqual(name, "a")

            # OGR provider - multiple layers
            _copyFile(os.path.join(outdir, "b.csv"))
            name1 = GdalUtils.ogrLayerName(outdir + "|layerid=0")
            name2 = GdalUtils.ogrLayerName(outdir + "|layerid=1")
            self.assertEqual(sorted([name1, name2]), ["a", "b"])

            name = GdalUtils.ogrLayerName(outdir + "|layerid=2")
            self.assertIsNone(name)

            # OGR provider - layername takes precedence
            name = GdalUtils.ogrLayerName(outdir + "|layername=f")
            self.assertEqual(name, "f")

            name = GdalUtils.ogrLayerName(outdir + "|layerid=0|layername=f")
            self.assertEqual(name, "f")

            name = GdalUtils.ogrLayerName(outdir + "|layername=f|layerid=0")
            self.assertEqual(name, "f")

            # SQLite provider
            name = GdalUtils.ogrLayerName(
                "dbname='/tmp/x.sqlite' table=\"t\" (geometry) sql="
            )
            self.assertEqual(name, "t")

            # PostgreSQL provider
            name = GdalUtils.ogrLayerName(
                'port=5493 sslmode=disable key=\'edge_id\' srid=0 type=LineString table="city_data"."edge" (geom) sql='
            )
            self.assertEqual(name, "city_data.edge")

    def test_gdal_connection_details_from_uri(self):
        context = QgsProcessingContext()
        output_details = GdalUtils.gdal_connection_details_from_uri(
            "d:/test/test.shp", context
        )
        self.assertEqual(output_details.connection_string, "d:/test/test.shp")
        self.assertEqual(output_details.format, '"ESRI Shapefile"')
        output_details = GdalUtils.gdal_connection_details_from_uri(
            "d:/test/test.mif", context
        )
        self.assertEqual(output_details.connection_string, "d:/test/test.mif")
        self.assertEqual(output_details.format, '"MapInfo File"')

    def testConnectionString(self):
        alg = OgrToPostGis()
        alg.initAlgorithm()
        parameters = {}
        feedback = QgsProcessingFeedback()
        context = QgsProcessingContext()

        # NOTE: defaults are debatable, see
        # https://github.com/qgis/QGIS/pull/3607#issuecomment-253971020
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "host=localhost port=5432 active_schema=public",
        )

        parameters["HOST"] = "remote"
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "host=remote port=5432 active_schema=public",
        )

        parameters["HOST"] = ""
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "port=5432 active_schema=public",
        )

        parameters["PORT"] = "5555"
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "port=5555 active_schema=public",
        )

        parameters["PORT"] = ""
        self.assertEqual(
            alg.getConnectionString(parameters, context), "active_schema=public"
        )

        parameters["USER"] = "usr"
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "active_schema=public user=usr",
        )

        parameters["PASSWORD"] = "pwd"
        self.assertEqual(
            alg.getConnectionString(parameters, context),
            "password=pwd active_schema=public user=usr",
        )

    def testCrsConversion(self):
        self.assertFalse(GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem()))
        self.assertEqual(
            GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("EPSG:3111")),
            "EPSG:3111",
        )
        self.assertEqual(
            GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("POSTGIS:3111")),
            "EPSG:3111",
        )
        self.assertEqual(
            GdalUtils.gdal_crs_string(
                QgsCoordinateReferenceSystem(
                    "proj4: +proj=utm +zone=36 +south +a=6378249.145 +b=6356514.966398753 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs"
                )
            ),
            "EPSG:20936",
        )
        crs = QgsCoordinateReferenceSystem()
        crs.createFromProj(
            "+proj=utm +zone=36 +south +a=600000 +b=70000 +towgs84=-143,-90,-294,0,0,0,0 +units=m +no_defs"
        )
        self.assertTrue(crs.isValid())

        # proj 6, WKT should be used
        self.assertEqual(
            GdalUtils.gdal_crs_string(crs)[:40],
            'BOUNDCRS[SOURCECRS[PROJCRS["unknown",BAS',
        )

        self.assertEqual(
            GdalUtils.gdal_crs_string(QgsCoordinateReferenceSystem("ESRI:102003")),
            "ESRI:102003",
        )

    def testEscapeAndJoin(self):
        self.assertEqual(
            GdalUtils.escapeAndJoin([1, "a", "a b", "a&b", "a(b)", ";"]),
            '1 a "a b" "a&b" "a(b)" ";"',
        )
        self.assertEqual(
            GdalUtils.escapeAndJoin([1, "-srcnodata", "--srcnodata", "-9999 9999"]),
            '1 -srcnodata --srcnodata "-9999 9999"',
        )


if __name__ == "__main__":
    nose2.main()
