1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
|
# SPDX-License-Identifier: GPL-2.0-or-later
#
# This file is part of osm2pgsql (https://osm2pgsql.org/).
#
# Copyright (C) 2006-2025 by the osm2pgsql developer community.
# For a full list of authors see the git log.
"""
Steps for executing osm2pgsql.
"""
from io import StringIO
from pathlib import Path
import sys
import subprocess
import contextlib
import logging
import datetime as dt
from osmium.replication.server import OsmosisState
def get_import_file(context):
if context.import_file is not None:
return str(context.import_file), None
context.geometry_factory.complete_node_list(context.import_data['n'])
# sort by OSM id
for obj in context.import_data.values():
obj.sort(key=lambda l: int(l.split(' ')[0][1:]))
fd = StringIO()
for typ in ('n', 'w', 'r'):
for line in context.import_data[typ]:
fd.write(line)
fd.write('\n')
context.import_data[typ].clear()
return '-', fd.getvalue()
def run_osm2pgsql(context, output):
assert output in ('flex', 'pgsql', 'null', 'nooutput')
cmdline = [str(Path(context.config.userdata['BINARY']).resolve())]
if output != 'nooutput':
cmdline.extend(('-O', output))
cmdline.extend(context.osm2pgsql_params)
# convert table items to CLI arguments and inject constants to placeholders
if context.table:
cmdline.extend(f.format(**context.config.userdata) for f in context.table.headings if f)
for row in context.table:
cmdline.extend(f.format(**context.config.userdata) for f in row if f)
if '-d' not in cmdline and '--database' not in cmdline:
cmdline.extend(('-d', context.config.userdata['TEST_DB']))
if 'tablespacetest' in cmdline and not context.config.userdata['HAVE_TABLESPACE']:
context.scenario.skip('tablespace tablespacetest not available')
return
if output == 'pgsql':
if '-S' not in cmdline:
cmdline.extend(('-S', str(context.default_data_dir / 'default.style')))
data_file, data_stdin = get_import_file(context)
if data_stdin is not None:
data_stdin = data_stdin.encode('utf-8')
cmdline.extend(('-r', 'opl'))
cmdline.append(data_file)
proc = subprocess.Popen(cmdline, cwd=str(context.workdir),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
outdata = proc.communicate(input=data_stdin)
context.osm2pgsql_outdata = [d.decode('utf-8').replace('\\n', '\n') for d in outdata]
return proc.returncode
def run_osm2pgsql_replication(context):
cmdline = []
# convert table items to CLI arguments and inject constants to placeholders
if context.table:
cmdline.extend(f.format(**context.config.userdata) for f in context.table.headings if f)
for row in context.table:
cmdline.extend(f.format(**context.config.userdata) for f in row if f)
if '-d' not in cmdline and '--database' not in cmdline:
cmdline.extend(('-d', context.config.userdata['TEST_DB']))
if cmdline[0] == 'update':
cmdline.extend(('--osm2pgsql-cmd',
str(Path(context.config.userdata['BINARY']).resolve())))
if '--' not in cmdline:
cmdline.extend(('--', '-S', str(context.default_data_dir / 'default.style')))
serr = StringIO()
log_handler = logging.StreamHandler(serr)
context.osm2pgsql_replication.LOG.addHandler(log_handler)
with contextlib.redirect_stdout(StringIO()) as sout:
retval = context.osm2pgsql_replication.main(cmdline)
context.osm2pgsql_replication.LOG.removeHandler(log_handler)
context.osm2pgsql_outdata = [sout.getvalue(), serr.getvalue()]
print(context.osm2pgsql_outdata)
return retval
@given("no lua tagtransform")
def do_not_setup_tagtransform(context):
pass
@given("the default lua tagtransform")
def setup_lua_tagtransform(context):
context.osm2pgsql_params.extend(('--tag-transform-script',
str(context.default_data_dir / 'style.lua')))
@given("the lua style")
def setup_inline_lua_style(context):
outfile = context.workdir / 'inline_style.lua'
outfile.write_text(context.text)
if '-S' in context.osm2pgsql_params:
context.osm2pgsql_params[context.osm2pgsql_params.index('-S') + 1] = str(outfile)
else:
context.osm2pgsql_params.extend(('-S', str(outfile)))
@given("the style file '(?P<style>.+)'")
def setup_style_file(context, style):
if '-S' in context.osm2pgsql_params:
context.osm2pgsql_params[context.osm2pgsql_params.index('-S') + 1] = str(context.test_data_dir / style)
else:
context.osm2pgsql_params.extend(('-S', str(context.test_data_dir / style)))
@when(r"running osm2pgsql (?P<output>\w+)(?: with parameters)?")
def execute_osm2pgsql_successfully(context, output):
returncode = run_osm2pgsql(context, output)
if context.scenario.status == "skipped":
return
assert returncode == 0,\
f"osm2pgsql failed with error code {returncode}.\n"\
f"Output:\n{context.osm2pgsql_outdata[0]}\n{context.osm2pgsql_outdata[1]}\n"
@then(r"running osm2pgsql (?P<output>\w+)(?: with parameters)? fails")
def execute_osm2pgsql_with_failure(context, output):
returncode = run_osm2pgsql(context, output)
if context.scenario.status == "skipped":
return
assert returncode != 0, "osm2pgsql unexpectedly succeeded"
@when("running osm2pgsql-replication")
def execute_osm2pgsql_replication_successfully(context):
returncode = run_osm2pgsql_replication(context)
assert returncode == 0,\
f"osm2pgsql-replication failed with error code {returncode}.\n"\
f"Output:\n{context.osm2pgsql_outdata[0]}\n{context.osm2pgsql_outdata[1]}\n"
@then(r"running osm2pgsql-replication fails(?: with returncode (?P<expected>\d+))?")
def execute_osm2pgsql_replication_successfully(context, expected):
returncode = run_osm2pgsql_replication(context)
assert returncode != 0, "osm2pgsql-replication unexpectedly succeeded"
if expected:
assert returncode == int(expected), \
f"osm2pgsql-replication failed with returncode {returncode} instead of {expected}."\
f"Output:\n{context.osm2pgsql_outdata[0]}\n{context.osm2pgsql_outdata[1]}\n"
@then(r"the (?P<kind>\w+) output contains")
def check_program_output(context, kind):
if kind == 'error':
s = context.osm2pgsql_outdata[1]
elif kind == 'standard':
s = context.osm2pgsql_outdata[0]
else:
assert not "Expect one of error, standard"
for line in context.text.split('\n'):
line = line.strip()
if line:
assert line in s,\
f"Output '{line}' not found in {kind} output:\n{s}\n"
@given("the replication service at (?P<base_url>.*)")
def setup_replication_mock(context, base_url):
context.osm2pgsql_replication.ReplicationServer.expected_base_url = base_url
if context.table:
context.osm2pgsql_replication.ReplicationServer.state_infos =\
[OsmosisState(int(row[0]),
dt.datetime.strptime(row[1], '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=dt.timezone.utc))
for row in context.table]
@given("the URL (?P<base_url>.*) returns")
def mock_url_response(context, base_url):
context.urlrequest_responses[base_url] = context.text
|