1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
"""
Database related functions to be used in Python scripts.
Usage:
::
from grass.script import db as grass
grass.db_describe(table)
...
(C) 2008-2015 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.
.. sectionauthor:: Glynn Clements
.. sectionauthor:: Martin Landa <landa.martin gmail.com>
"""
from __future__ import absolute_import
from .core import *
from .utils import try_remove
from grass.exceptions import CalledModuleError
def db_describe(table, **args):
"""Return the list of columns for a database table
(interface to `db.describe -c`). Example:
>>> run_command('g.copy', vector='firestations,myfirestations')
0
>>> db_describe('myfirestations') # doctest: +ELLIPSIS
{'nrows': 71, 'cols': [['cat', 'INTEGER', '20'], ... 'ncols': 22}
>>> run_command('g.remove', flags='f', type='vector', name='myfirestations')
0
:param str table: table name
:param list args:
:return: parsed module output
"""
s = read_command('db.describe', flags='c', table=table, **args)
if not s:
fatal(_("Unable to describe table <%s>") % table)
cols = []
result = {}
for l in s.splitlines():
f = l.split(':')
key = f[0]
f[1] = f[1].lstrip(' ')
if key.startswith('Column '):
n = int(key.split(' ')[1])
cols.insert(n, f[1:])
elif key in ['ncols', 'nrows']:
result[key] = int(f[1])
else:
result[key] = f[1:]
result['cols'] = cols
return result
def db_table_exist(table, **args):
"""Check if table exists.
If no driver or database are given, then default settings is used
(check db_connection()).
>>> run_command('g.copy', vector='firestations,myfirestations')
0
>>> db_table_exist('myfirestations')
True
>>> run_command('g.remove', flags='f', type='vector', name='myfirestations')
0
:param str table: table name
:param args:
:return: True for success, False otherwise
"""
nuldev = file(os.devnull, 'w+')
ok = True
try:
run_command('db.describe', flags='c', table=table,
stdout=nuldev, stderr=nuldev, **args)
except CalledModuleError:
ok = False
finally:
nuldev.close()
return ok
def db_connection(force=False):
"""Return the current database connection parameters
(interface to `db.connect -g`). Example:
>>> db_connection()
{'group': '', 'schema': '', 'driver': 'sqlite', 'database': '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'}
:param force True to set up default DB connection if not defined
:return: parsed output of db.connect
"""
try:
nuldev = file(os.devnull, 'w')
conn = parse_command('db.connect', flags='g', stderr=nuldev)
nuldev.close()
except CalledModuleError:
conn = None
if not conn and force:
run_command('db.connect', flags='c')
conn = parse_command('db.connect', flags='g')
return conn
def db_select(sql=None, filename=None, table=None, **args):
"""Perform SQL select statement
Note: one of <em>sql</em>, <em>filename</em>, or <em>table</em>
arguments must be provided.
Examples:
>>> run_command('g.copy', vector='firestations,myfirestations')
0
>>> db_select(sql = 'SELECT cat,CITY FROM myfirestations WHERE cat < 4')
(('1', 'Morrisville'), ('2', 'Morrisville'), ('3', 'Apex'))
Simplyfied usage (it performs <tt>SELECT * FROM myfirestations</tt>.)
>>> db_select(table = 'myfirestations') # doctest: +ELLIPSIS
(('1', '24', 'Morrisville #3', ... 'HS2A', '1.37'))
>>> run_command('g.remove', flags='f', type='vector', name='myfirestations')
0
:param str sql: SQL statement to perform (or None)
:param str filename: name of file with SQL statements (or None)
:param str table: name of table to query (or None)
:param str args: see \gmod{db.select} arguments
"""
fname = tempfile(create=False)
if sql:
args['sql'] = sql
elif filename:
args['input'] = filename
elif table:
args['table'] = table
else:
fatal(_("Programmer error: '%(sql)s', '%(filename)s', or '%(table)s' must be provided") %
{'sql': 'sql', 'filename': 'filename', 'table': 'table'} )
if 'sep' not in args:
args['sep'] = '|'
try:
run_command('db.select', quiet=True, flags='c',
output=fname, **args)
except CalledModuleError:
fatal(_("Fetching data failed"))
ofile = open(fname)
result = [tuple(x.rstrip(os.linesep).split(args['sep'])) for x in ofile.readlines()]
ofile.close()
try_remove(fname)
return tuple(result)
def db_table_in_vector(table):
"""Return the name of vector connected to the table.
It returns None if no vectors are connected to the table.
>>> run_command('g.copy', vector='firestations,myfirestations')
0
>>> db_table_in_vector('myfirestations')
['myfirestations@user1']
>>> db_table_in_vector('mfirestations')
>>> run_command('g.remove', flags='f', type='vector', name='myfirestations')
0
:param str table: name of table to query
"""
from .vector import vector_db
nuldev = file(os.devnull, 'w')
used = []
vects = list_strings('vect')
for vect in vects:
for f in vector_db(vect, stderr=nuldev).values():
if not f:
continue
if f['table'] == table:
used.append(vect)
break
if len(used) > 0:
return used
else:
return None
def db_begin_transaction(driver):
"""Begin transaction.
:return: SQL command as string
"""
if driver in ('sqlite', 'pg'):
return 'BEGIN'
if driver == 'mysql':
return 'START TRANSACTION'
return ''
def db_commit_transaction(driver):
"""Commit transaction.
:return: SQL command as string
"""
if driver in ('sqlite', 'pg', 'mysql'):
return 'COMMIT'
return ''
|