File: data_model.py

package info (click to toggle)
qgis 3.10.14%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 889,896 kB
  • sloc: cpp: 986,392; python: 156,157; xml: 12,700; perl: 3,286; sh: 2,637; ansic: 2,085; sql: 1,514; javascript: 1,363; yacc: 1,000; lex: 562; lisp: 411; makefile: 234
file content (116 lines) | stat: -rw-r--r-- 4,012 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: utf-8 -*-

"""
/***************************************************************************
Name                 : DB Manager
Description          : Database manager plugin for QGIS
Date                 : May 23, 2011
copyright            : (C) 2011 by Giuseppe Sucameli
email                : brush.tyler@gmail.com

 ***************************************************************************/

/***************************************************************************
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 ***************************************************************************/
"""

from qgis.core import QgsMessageLog
from ..plugin import BaseError
from ..data_model import (TableDataModel,
                          SqlResultModel,
                          SqlResultModelAsync,
                          SqlResultModelTask)


class PGTableDataModel(TableDataModel):

    def __init__(self, table, parent=None):
        self.cursor = None
        TableDataModel.__init__(self, table, parent)

        if self.table.rowCount is None:
            self.table.refreshRowCount()
            if self.table.rowCount is None:
                return

        self.table.aboutToChange.connect(self._deleteCursor)
        self._createCursor()

    def _createCursor(self):
        fields_txt = u", ".join(self.fields)
        table_txt = self.db.quoteId((self.table.schemaName(), self.table.name))

        self.cursor = self.db._get_cursor()
        sql = u"SELECT %s FROM %s" % (fields_txt, table_txt)
        self.db._execute(self.cursor, sql)

    def _sanitizeTableField(self, field):
        # get fields, ignore geometry columns
        if field.dataType.lower() == "geometry":
            return u"CASE WHEN %(fld)s IS NULL THEN NULL ELSE GeometryType(%(fld)s) END AS %(fld)s" % {
                'fld': self.db.quoteId(field.name)}
        elif field.dataType.lower() == "raster":
            return u"CASE WHEN %(fld)s IS NULL THEN NULL ELSE 'RASTER' END AS %(fld)s" % {
                'fld': self.db.quoteId(field.name)}
        return u"%s::text" % self.db.quoteId(field.name)

    def _deleteCursor(self):
        self.db._close_cursor(self.cursor)
        self.cursor = None

    def __del__(self):
        self.table.aboutToChange.disconnect(self._deleteCursor)
        self._deleteCursor()
        pass  # print "PGTableModel.__del__"

    def fetchMoreData(self, row_start):
        if not self.cursor:
            self._createCursor()

        try:
            self.cursor.scroll(row_start, mode='absolute')
        except self.db.error_types():
            self._deleteCursor()
            return self.fetchMoreData(row_start)

        self.resdata = self.cursor.fetchmany(self.fetchedCount)
        self.fetchedFrom = row_start


class PGSqlResultModelTask(SqlResultModelTask):

    def __init__(self, db, sql, parent):
        super().__init__(db, sql, parent)

    def run(self):
        try:
            self.model = PGSqlResultModel(self.db, self.sql, None)
        except BaseError as e:
            self.error = e
            QgsMessageLog.logMessage(e.msg)
            return False
        return True

    def cancel(self):
        self.db.connector.cancel()
        SqlResultModelTask.cancel(self)


class PGSqlResultModelAsync(SqlResultModelAsync):

    def __init__(self, db, sql, parent):
        super().__init__()

        self.task = PGSqlResultModelTask(db, sql, parent)
        self.task.taskCompleted.connect(self.modelDone)
        self.task.taskTerminated.connect(self.modelDone)


class PGSqlResultModel(SqlResultModel):
    pass