1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# SPDX-License-Identifier: GPL-2.0-or-later
#
# This file is part of osm2pgsql (https://osm2pgsql.org/).
#
# Copyright (C) 2023 by the osm2pgsql developer community.
# For a full list of authors see the git log.
"""
Steps for executing osm2pgsql.
"""
from io import StringIO
from pathlib import Path
import subprocess
def get_import_file(context):
if context.import_file is not None:
return str(context.import_file), None
context.geometry_factory.complete_node_list(context.import_data['n'])
# sort by OSM id
for obj in context.import_data.values():
obj.sort(key=lambda l: int(l.split(' ')[0][1:]))
fd = StringIO()
for typ in ('n', 'w', 'r'):
for line in context.import_data[typ]:
fd.write(line)
fd.write('\n')
context.import_data[typ].clear()
return '-', fd.getvalue()
def run_osm2pgsql(context, output):
assert output in ('flex', 'pgsql', 'gazetteer', 'none')
cmdline = [str(Path(context.config.userdata['BINARY']).resolve())]
cmdline.extend(('-O', output))
cmdline.extend(context.osm2pgsql_params)
if context.table:
cmdline.extend(f for f in context.table.headings if f)
for row in context.table:
cmdline.extend(f.format(**context.config.userdata) for f in row if f)
if '-d' not in cmdline and '--database' not in cmdline:
cmdline.extend(('-d', context.config.userdata['TEST_DB']))
if 'tablespacetest' in cmdline and not context.config.userdata['HAVE_TABLESPACE']:
context.scenario.skip('tablespace tablespacetest not available')
return
if output == 'pgsql':
if '-S' not in cmdline:
cmdline.extend(('-S', str(context.default_data_dir / 'default.style')))
data_file, data_stdin = get_import_file(context)
if data_stdin is not None:
data_stdin = data_stdin.encode('utf-8')
cmdline.extend(('-r', 'opl'))
cmdline.append(data_file)
proc = subprocess.Popen(cmdline, cwd=str(context.workdir),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
outdata = proc.communicate(input=data_stdin)
context.osm2pgsql_outdata = [d.decode('utf-8').replace('\\n', '\n') for d in outdata]
return proc.returncode
@given("no lua tagtransform")
def do_not_setup_tagtransform(context):
pass
@given("the default lua tagtransform")
def setup_lua_tagtransform(context):
if not context.config.userdata['HAVE_LUA']:
context.scenario.skip("Lua support not compiled in.")
return
context.osm2pgsql_params.extend(('--tag-transform-script',
str(context.default_data_dir / 'style.lua')))
@given("the lua style")
def setup_inline_lua_style(context):
if not context.config.userdata['HAVE_LUA']:
context.scenario.skip("Lua support not compiled in.")
return
outfile = context.workdir / 'inline_style.lua'
outfile.write_text(context.text)
context.osm2pgsql_params.extend(('-S', str(outfile)))
@given("the style file '(?P<style>.+)'")
def setup_style_file(context, style):
if style.endswith('.lua') and not context.config.userdata['HAVE_LUA']:
context.scenario.skip("Lua support not compiled in.")
return
context.osm2pgsql_params.extend(('-S', str(context.test_data_dir / style)))
@when("running osm2pgsql (?P<output>\w+)(?: with parameters)?")
def execute_osm2pgsql_sucessfully(context, output):
returncode = run_osm2pgsql(context, output)
if context.scenario.status == "skipped":
return
assert returncode == 0,\
f"osm2pgsql failed with error code {returncode}.\n"\
f"Output:\n{context.osm2pgsql_outdata[0]}\n{context.osm2pgsql_outdata[1]}\n"
@then("running osm2pgsql (?P<output>\w+)(?: with parameters)? fails")
def execute_osm2pgsql_with_failure(context, output):
returncode = run_osm2pgsql(context, output)
if context.scenario.status == "skipped":
return
assert returncode != 0, "osm2pgsql unexpectedly succeeded"
@then("the (?P<kind>\w+) output contains")
def check_program_output(context, kind):
if kind == 'error':
s = context.osm2pgsql_outdata[1]
elif kind == 'standard':
s = context.osm2pgsql_outdata[0]
else:
assert not "Expect one of error, standard"
for line in context.text.split('\n'):
line = line.strip()
if line:
assert line in s,\
f"Output '{line}' not found in {kind} output:\n{s}\n"
|