1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
|
#!/usr/bin/env python3
'''This script is designed to load quasi-static data into a PostGIS database
for rendering maps. It differs from the usual scripts to do this in that it is
designed to take its configuration from a file rather than be a series of shell
commands.
Some implicit assumptions are
- Time spent querying (rendering) the data is more valuable than the one-time
cost of loading it
- The script will not be running multiple times in parallel. This is not
normally likely because the script is likely to be called daily or less,
not minutely.
- Usage patterns will be similar to typical map rendering
'''
import yaml
from urllib.parse import urlparse
import os
import re
import argparse
import shutil
# modules for getting data
import zipfile
import requests
import io
# modules for converting and postgres loading
import subprocess
import psycopg2
import logging
def database_setup(conn, temp_schema, schema, metadata_table):
with conn.cursor() as cur:
cur.execute('''CREATE SCHEMA IF NOT EXISTS {temp_schema};'''
.format(temp_schema=temp_schema))
cur.execute(('''CREATE TABLE IF NOT EXISTS "{schema}"."{metadata_table}"'''
''' (name text primary key, last_modified text);''')
.format(schema=schema, metadata_table=metadata_table))
conn.commit()
class Table:
def __init__(self, name, conn, temp_schema, schema, metadata_table):
self._name = name
self._conn = conn
self._temp_schema = temp_schema
self._dst_schema = schema
self._metadata_table = metadata_table
# Clean up the temporary schema in preparation for loading
def clean_temp(self):
with self._conn.cursor() as cur:
cur.execute('''DROP TABLE IF EXISTS "{temp_schema}"."{name}"'''
.format(name=self._name, temp_schema=self._temp_schema))
self._conn.commit()
# get the last modified date from the metadata table
def last_modified(self):
with self._conn.cursor() as cur:
cur.execute('''SELECT last_modified FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table), [self._name])
results = cur.fetchone()
if results is not None:
return results[0]
self._conn.commit()
def grant_access(self, user):
with self._conn.cursor() as cur:
cur.execute('''GRANT SELECT ON "{temp_schema}"."{name}" TO "{user}";'''
.format(name=self._name, temp_schema=self._temp_schema, user=user))
self._conn.commit()
def index(self):
with self._conn.cursor() as cur:
# Disable autovacuum while manipulating the table, since it'll get clustered towards the end.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" SET ( autovacuum_enabled = FALSE );'''
.format(name=self._name, temp_schema=self._temp_schema))
# ogr creates a ogc_fid column we don't need
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" DROP COLUMN ogc_fid;'''
.format(name=self._name, temp_schema=self._temp_schema))
# Null geometries are useless for rendering
cur.execute('''DELETE FROM "{temp_schema}"."{name}" WHERE way IS NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" ALTER COLUMN way SET NOT NULL;'''
.format(name=self._name, temp_schema=self._temp_schema))
# sorting static tables helps performance and reduces size from the column drop above
cur.execute(('''CREATE INDEX "{name}_order" ON "{temp_schema}"."{name}" '''
'''(ST_Envelope(way));'''
'''CLUSTER "{temp_schema}"."{name}" '''
'''USING "{name}_order";'''
'''DROP INDEX "{temp_schema}"."{name}_order";'''
'''CREATE INDEX ON "{temp_schema}"."{name}" '''
'''USING GIST (way) WITH (fillfactor=100);''')
.format(name=self._name, temp_schema=self._temp_schema))
# Reset autovacuum. The table is static, so this doesn't really
# matter since it'll never need a vacuum.
cur.execute('''ALTER TABLE "{temp_schema}"."{name}" RESET ( autovacuum_enabled );'''
.format(name=self._name, temp_schema=self._temp_schema))
self._conn.commit()
# VACUUM can't be run in transaction, so autocommit needs to be turned on
old_autocommit = self._conn.autocommit
try:
self._conn.autocommit = True
with self._conn.cursor() as cur:
cur.execute('''VACUUM ANALYZE "{temp_schema}"."{name}";'''
.format(name=self._name, temp_schema=self._temp_schema))
finally:
self._conn.autocommit = old_autocommit
def replace(self, new_last_modified):
with self._conn.cursor() as cur:
cur.execute('''BEGIN;''')
cur.execute(('''DROP TABLE IF EXISTS "{schema}"."{name}";'''
'''ALTER TABLE "{temp_schema}"."{name}" SET SCHEMA "{schema}";''')
.format(name=self._name, temp_schema=self._temp_schema, schema=self._dst_schema))
# We checked if the metadata table had this table way up above
cur.execute('''SELECT 1 FROM "{schema}"."{metadata_table}" WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[self._name])
if cur.rowcount == 0:
cur.execute(('''INSERT INTO "{schema}"."{metadata_table}" '''
'''(name, last_modified) VALUES (%s, %s)''')
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[self._name, new_last_modified])
else:
cur.execute('''UPDATE "{schema}"."{metadata_table}" SET last_modified = %s WHERE name = %s'''
.format(schema=self._dst_schema, metadata_table=self._metadata_table),
[new_last_modified, self._name])
self._conn.commit()
class Downloader:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'get-external-data.py/osm-carto'})
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.session.close()
def _download(self, url, headers=None):
if url.startswith('file://'):
filename = url[7:]
if headers and 'If-Modified-Since' in headers:
if str(os.path.getmtime(filename)) == headers['If-Modified-Since']:
return DownloadResult(status_code = requests.codes.not_modified)
with open(filename, 'rb') as fp:
return DownloadResult(status_code = 200, content = fp.read(),
last_modified = str(os.fstat(fp.fileno()).st_mtime))
response = self.session.get(url, headers=headers)
response.raise_for_status()
return DownloadResult(status_code = response.status_code, content = response.content,
last_modified = response.headers.get('Last-Modified', None))
def download(self, url, name, opts, data_dir, table_last_modified):
filename = os.path.join(data_dir, os.path.basename(urlparse(url).path))
filename_lastmod = filename + '.lastmod'
if os.path.exists(filename) and os.path.exists(filename_lastmod):
with open(filename_lastmod, 'r') as fp:
lastmod_cache = fp.read()
with open(filename, 'rb') as fp:
cached_data = DownloadResult(status_code = 200, content = fp.read(),
last_modified = lastmod_cache)
else:
cached_data = None
lastmod_cache = None
result = None
# Variable used to tell if we downloaded something
download_happened = False
if opts.no_update and (cached_data or table_last_modified):
# It is ok if this returns None, because for this to be None,
# we need to have something in table and therefore need not import (since we are opts.no-update)
result = cached_data
else:
if opts.force:
headers = {}
else:
# If none of those 2 exist, value will be None and it will have the same effect as not having If-Modified-Since set
headers = {'If-Modified-Since': table_last_modified or lastmod_cache}
response = self._download(url, headers)
# Check status codes
if response.status_code == requests.codes.ok:
logging.info(" Download complete ({} bytes)".format(len(response.content)))
download_happened = True
if opts.cache:
# Write to cache
with open(filename, 'wb') as fp:
fp.write(response.content)
with open(filename_lastmod, 'w') as fp:
fp.write(response.last_modified)
result = response
elif response.status_code == requests.codes.not_modified:
# Now we need to figure out if our not modified data came from table or cache
if os.path.exists(filename) and os.path.exists(filename_lastmod):
logging.info(" Cached file {} did not require updating".format(url))
result = cached_data
else:
result = None
else:
logging.critical(" Unexpected response code ({}".format(response.status_code))
logging.critical(" Content {} was not downloaded".format(name))
return None
if opts.delete_cache or (not opts.cache and download_happened):
try:
os.remove(filename)
os.remove(filename_lastmod)
except FileNotFoundError:
pass
return result
class DownloadResult:
def __init__(self, status_code, content=None, last_modified=None):
self.status_code = status_code
self.content = content
self.last_modified = last_modified
def main():
# parse options
parser = argparse.ArgumentParser(
description="Load external data into a database")
parser.add_argument("-f", "--force", action="store_true",
help="Download and import new data, even if not required.")
parser.add_argument("-C", "--cache", action="store_true",
help="Cache downloaded data. Useful if you'll have your database volume deleted in the future")
parser.add_argument("--no-update", action="store_true",
help="Don't download newer data than what is locally available (either in cache or table). Overridden by --force")
parser.add_argument("--delete-cache", action="store_true",
help="Execute as usual, but delete cached data")
parser.add_argument("--force-import", action="store_true",
help="Import data into table even if may not be needed")
parser.add_argument("-c", "--config", action="store", default="external-data.yml",
help="Name of configuration file (default external-data.yml)")
parser.add_argument("-D", "--data", action="store",
help="Override data download directory")
parser.add_argument("-d", "--database", action="store",
help="Override database name to connect to")
parser.add_argument("-H", "--host", action="store",
help="Override database server host or socket directory")
parser.add_argument("-p", "--port", action="store",
help="Override database server port")
parser.add_argument("-U", "--username", action="store",
help="Override database user name")
parser.add_argument("-v", "--verbose", action="store_true",
help="Be more verbose. Overrides -q")
parser.add_argument("-q", "--quiet", action="store_true",
help="Only report serious problems")
parser.add_argument("-w", "--password", action="store",
help="Override database password")
parser.add_argument("-R", "--renderuser", action="store",
help="User to grant access for rendering")
opts = parser.parse_args()
if opts.verbose:
logging.basicConfig(level=logging.DEBUG)
elif opts.quiet:
logging.basicConfig(level=logging.WARNING)
else:
logging.basicConfig(level=logging.INFO)
if opts.force and opts.no_update:
opts.no_update = False
logging.warning("Force (-f) flag overrides --no-update flag")
logging.info("Starting load of external data into database")
with open(opts.config) as config_file:
config = yaml.safe_load(config_file)
data_dir = opts.data or config["settings"]["data_dir"]
os.makedirs(data_dir, exist_ok=True)
# If the DB options are unspecified in both on the command line and in the
# config file, libpq will pick what to use with the None
database = opts.database or config["settings"].get("database")
host = opts.host or config["settings"].get("host")
port = opts.port or config["settings"].get("port")
user = opts.username or config["settings"].get("username")
password = opts.password or config["settings"].get("password")
renderuser = opts.renderuser or config["settings"].get("renderuser")
with Downloader() as d:
conn = None
conn = psycopg2.connect(database=database,
host=host, port=port,
user=user,
password=password)
# DB setup
database_setup(conn, config["settings"]["temp_schema"],
config["settings"]["schema"],
config["settings"]["metadata_table"])
for name, source in config["sources"].items():
logging.info("Checking table {}".format(name))
# Don't attempt to handle strange names
# Even if there was code to escape them properly here, you don't want
# in a style with all the quoting headaches
if not re.match('''^[a-zA-Z0-9_]+$''', name):
raise RuntimeError(
"Only ASCII alphanumeric table are names supported")
this_table = Table(name, conn,
config["settings"]["temp_schema"],
config["settings"]["schema"],
config["settings"]["metadata_table"])
this_table.clean_temp()
# This will fetch data needed for import
download = d.download(source["url"], name, opts, data_dir, this_table.last_modified())
# Check if there is need to import
if download == None or (not opts.force and not opts.force_import and this_table.last_modified() == download.last_modified):
logging.info(" Table {} did not require updating".format(name))
continue
workingdir = os.path.join(data_dir, name)
shutil.rmtree(workingdir, ignore_errors=True)
os.makedirs(workingdir, exist_ok=True)
if "archive" in source and source["archive"]["format"] == "zip":
logging.info(" Decompressing file")
zip = zipfile.ZipFile(io.BytesIO(download.content))
for member in source["archive"]["files"]:
zip.extract(member, workingdir)
ogrpg = "PG:dbname={}".format(database)
if port is not None:
ogrpg = ogrpg + " port={}".format(port)
if user is not None:
ogrpg = ogrpg + " user={}".format(user)
if host is not None:
ogrpg = ogrpg + " host={}".format(host)
if password is not None:
ogrpg = ogrpg + " password={}".format(password)
ogrcommand = ["ogr2ogr",
'-f', 'PostgreSQL',
'-lco', 'GEOMETRY_NAME=way',
'-lco', 'SPATIAL_INDEX=FALSE',
'-lco', 'EXTRACT_SCHEMA_FROM_LAYER_NAME=YES',
'-nln', "{}.{}".format(config["settings"]["temp_schema"], name)]
if "ogropts" in source:
ogrcommand += source["ogropts"]
ogrcommand += [ogrpg,
os.path.join(workingdir, source["file"])]
logging.info(" Importing into database")
logging.debug("running {}".format(
subprocess.list2cmdline(ogrcommand)))
# ogr2ogr can raise errors here, so they need to be caught
try:
subprocess.check_output(
ogrcommand, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError as e:
# Add more detail on stdout for the logs
logging.critical(
"ogr2ogr returned {} with layer {}".format(e.returncode, name))
logging.critical("Command line was {}".format(
subprocess.list2cmdline(e.cmd)))
logging.critical("Output was\n{}".format(e.output))
logging.critical("Error was\n{}".format(e.stderr))
raise RuntimeError(
"ogr2ogr error when loading table {}".format(name))
logging.info(" Import complete")
this_table.index()
if renderuser is not None:
this_table.grant_access(renderuser)
this_table.replace(download.last_modified)
shutil.rmtree(workingdir, ignore_errors=True)
if conn:
conn.close()
if __name__ == '__main__':
main()
|