File: steps_execute.py

package info (click to toggle)
osm2pgsql 1.8.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 5,536 kB
  • sloc: cpp: 46,707; ansic: 1,804; python: 797; sh: 25; makefile: 14
file content (143 lines) | stat: -rw-r--r-- 4,576 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# SPDX-License-Identifier: GPL-2.0-or-later
#
# This file is part of osm2pgsql (https://osm2pgsql.org/).
#
# Copyright (C) 2023 by the osm2pgsql developer community.
# For a full list of authors see the git log.
"""
Steps for executing osm2pgsql.
"""
from io import StringIO
from pathlib import Path
import subprocess

def get_import_file(context):
    if context.import_file is not None:
        return str(context.import_file), None

    context.geometry_factory.complete_node_list(context.import_data['n'])

    # sort by OSM id
    for obj in context.import_data.values():
        obj.sort(key=lambda l: int(l.split(' ')[0][1:]))

    fd = StringIO()
    for typ in ('n', 'w', 'r'):
        for line in context.import_data[typ]:
            fd.write(line)
            fd.write('\n')
        context.import_data[typ].clear()

    return '-', fd.getvalue()


def run_osm2pgsql(context, output):
    assert output in ('flex', 'pgsql', 'gazetteer', 'none')

    cmdline = [str(Path(context.config.userdata['BINARY']).resolve())]
    cmdline.extend(('-O', output))
    cmdline.extend(context.osm2pgsql_params)

    if context.table:
        cmdline.extend(f for f in context.table.headings if f)
        for row in context.table:
            cmdline.extend(f.format(**context.config.userdata) for f in row if f)

    if '-d' not in cmdline and '--database' not in cmdline:
        cmdline.extend(('-d', context.config.userdata['TEST_DB']))

    if 'tablespacetest' in cmdline and not context.config.userdata['HAVE_TABLESPACE']:
       context.scenario.skip('tablespace tablespacetest not available')
       return

    if output == 'pgsql':
        if '-S' not in cmdline:
            cmdline.extend(('-S', str(context.default_data_dir / 'default.style')))

    data_file, data_stdin = get_import_file(context)

    if data_stdin is not None:
        data_stdin = data_stdin.encode('utf-8')
        cmdline.extend(('-r', 'opl'))

    cmdline.append(data_file)

    proc = subprocess.Popen(cmdline, cwd=str(context.workdir),
                            stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    outdata = proc.communicate(input=data_stdin)

    context.osm2pgsql_outdata = [d.decode('utf-8').replace('\\n', '\n') for d in outdata]

    return proc.returncode

@given("no lua tagtransform")
def do_not_setup_tagtransform(context):
    pass

@given("the default lua tagtransform")
def setup_lua_tagtransform(context):
    if not context.config.userdata['HAVE_LUA']:
        context.scenario.skip("Lua support not compiled in.")
        return

    context.osm2pgsql_params.extend(('--tag-transform-script',
                                    str(context.default_data_dir / 'style.lua')))

@given("the lua style")
def setup_inline_lua_style(context):
    if not context.config.userdata['HAVE_LUA']:
        context.scenario.skip("Lua support not compiled in.")
        return

    outfile = context.workdir / 'inline_style.lua'
    outfile.write_text(context.text)
    context.osm2pgsql_params.extend(('-S', str(outfile)))


@given("the style file '(?P<style>.+)'")
def setup_style_file(context, style):
    if style.endswith('.lua') and not context.config.userdata['HAVE_LUA']:
        context.scenario.skip("Lua support not compiled in.")
        return

    context.osm2pgsql_params.extend(('-S', str(context.test_data_dir / style)))


@when("running osm2pgsql (?P<output>\w+)(?: with parameters)?")
def execute_osm2pgsql_sucessfully(context, output):
    returncode = run_osm2pgsql(context, output)

    if context.scenario.status == "skipped":
        return

    assert returncode == 0,\
           f"osm2pgsql failed with error code {returncode}.\n"\
           f"Output:\n{context.osm2pgsql_outdata[0]}\n{context.osm2pgsql_outdata[1]}\n"


@then("running osm2pgsql (?P<output>\w+)(?: with parameters)? fails")
def execute_osm2pgsql_with_failure(context, output):
    returncode = run_osm2pgsql(context, output)

    if context.scenario.status == "skipped":
        return

    assert returncode != 0, "osm2pgsql unexpectedly succeeded"


@then("the (?P<kind>\w+) output contains")
def check_program_output(context, kind):
    if kind == 'error':
        s = context.osm2pgsql_outdata[1]
    elif kind == 'standard':
        s = context.osm2pgsql_outdata[0]
    else:
        assert not "Expect one of error, standard"

    for line in context.text.split('\n'):
        line = line.strip()
        if line:
            assert line in s,\
                   f"Output '{line}' not found in {kind} output:\n{s}\n"