1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
"""
***************************************************************************
Datasources2Vrt.py
---------------------
Date : May 2015
Copyright : (C) 2015 by Luigi Pirelli
Email : luipir at gmail dot com
***************************************************************************
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************
"""
__author__ = "Luigi Pirelli"
__date__ = "May 2015"
__copyright__ = "(C) 2015, Luigi Pirelli"
import html
import pathlib
from qgis.PyQt.QtCore import QCoreApplication
from qgis.core import (
QgsProcessing,
QgsProcessingException,
QgsProcessingParameterMultipleLayers,
QgsProcessingParameterBoolean,
QgsProcessingParameterVectorDestination,
QgsProcessingOutputString,
QgsProcessingParameters,
)
from processing.algs.gdal.GdalAlgorithm import GdalAlgorithm
from processing.algs.gdal.GdalUtils import GdalUtils
class Datasources2Vrt(GdalAlgorithm):
INPUT = "INPUT"
UNIONED = "UNIONED"
OUTPUT = "OUTPUT"
VRT_STRING = "VRT_STRING"
def createCustomParametersWidget(self, parent):
return None
def group(self):
return self.tr("Vector miscellaneous")
def groupId(self):
return "vectormiscellaneous"
def name(self):
return "buildvirtualvector"
def displayName(self):
return self.tr("Build virtual vector")
def tags(self):
return ["ogr", "gdal", "vrt", "create"]
def shortHelpString(self):
return self.tr(
"This algorithm creates a virtual layer that contains a set of vector layers.\n\n"
"The output virtual layer will not be opened in the current project."
)
def __init__(self):
super().__init__()
def initAlgorithm(self, config=None):
self.addParameter(
QgsProcessingParameterMultipleLayers(
self.INPUT,
self.tr("Input datasources"),
QgsProcessing.SourceType.TypeVector,
)
)
self.addParameter(
QgsProcessingParameterBoolean(
self.UNIONED, self.tr('Create "unioned" VRT'), defaultValue=False
)
)
class ParameterVectorVrtDestination(QgsProcessingParameterVectorDestination):
def __init__(self, name, description):
super().__init__(name, description)
def clone(self):
copy = ParameterVectorVrtDestination(self.name(), self.description())
return copy
def defaultFileExtension(self):
return "vrt"
def createFileFilter(self):
return "{} (*.vrt *.VRT)".format(
QCoreApplication.translate("GdalAlgorithm", "VRT files")
)
def supportedOutputRasterLayerExtensions(self):
return ["vrt"]
def isSupportedOutputValue(self, value, context):
output_path = QgsProcessingParameters.parameterAsOutputLayer(
self, value, context, testOnly=True
)
if pathlib.Path(output_path).suffix.lower() != ".vrt":
return False, QCoreApplication.translate(
"GdalAlgorithm", "Output filename must use a .vrt extension"
)
return True, ""
self.addParameter(
ParameterVectorVrtDestination(self.OUTPUT, self.tr("Virtual vector"))
)
self.addOutput(
QgsProcessingOutputString(self.VRT_STRING, self.tr("Virtual string"))
)
def processAlgorithm(self, parameters, context, feedback):
input_layers = self.parameterAsLayerList(parameters, self.INPUT, context)
unioned = self.parameterAsBoolean(parameters, self.UNIONED, context)
vrtPath = self.parameterAsOutputLayer(parameters, self.OUTPUT, context)
vrt = "<OGRVRTDataSource>"
if unioned:
vrt += '<OGRVRTUnionLayer name="UnionedLayer">'
total = 100.0 / len(input_layers) if input_layers else 0
for current, layer in enumerate(input_layers):
if feedback.isCanceled():
break
connection_details = GdalUtils.gdal_connection_details_from_layer(layer)
basePath = connection_details.connection_string
layerName = GdalUtils.ogrLayerName(layer.source())
vrt += f'<OGRVRTLayer name="{html.escape(layerName, True)}">'
vrt += f"<SrcDataSource>{html.escape(basePath, True)}</SrcDataSource>"
vrt += f"<SrcLayer>{html.escape(layerName, True)}</SrcLayer>"
vrt += "</OGRVRTLayer>"
feedback.setProgress(int(current * total))
if unioned:
vrt += "</OGRVRTUnionLayer>"
vrt += "</OGRVRTDataSource>"
with open(vrtPath, "w", encoding="utf-8") as f:
f.write(vrt)
return {self.OUTPUT: vrtPath, self.VRT_STRING: vrt}
def commandName(self):
return ""
|