File: mapDetectors.py

package info (click to toggle)
sumo 1.18.0%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,218,476 kB
  • sloc: xml: 2,488,246; cpp: 431,611; python: 236,255; java: 14,424; cs: 5,200; ansic: 494; sh: 474; makefile: 80; csh: 1
file content (110 lines) | stat: -rwxr-xr-x 5,332 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
# Copyright (C) 2013-2023 German Aerospace Center (DLR) and others.
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License 2.0 which is available at
# https://www.eclipse.org/legal/epl-2.0/
# This Source Code may also be made available under the following Secondary
# Licenses when the conditions for such availability set forth in the Eclipse
# Public License 2.0 are satisfied: GNU General Public License, version 2
# or later which is available at
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later

# @file    mapDetectors.py
# @author  Jakob Erdmann
# @author  Mirko Barthauer
# @date    2022-04-25

"""
Create detector definitions by map-matching coordinates to a .net.xml file
"""

from __future__ import print_function
from __future__ import absolute_import
import os
import sys
import csv
import io
SUMO_HOME = os.environ.get('SUMO_HOME',
                           os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..'))
sys.path.append(os.path.join(SUMO_HOME, 'tools'))
import sumolib  # noqa


def get_options(args=None):
    optParser = sumolib.options.ArgumentParser(
            description="Map detector locations to a network and write inductionLoop-definitions")
    optParser.add_argument("-n", "--net-file", dest="netfile", category="input", type=optParser.net_file,
                           help="define the net file (mandatory)")
    optParser.add_argument("-d", "--detector-file", dest="detfile", category="input", type=optParser.file,
                           help="csv input file with detector ids and coordinates")
    optParser.add_argument("--delimiter", default=";",
                           help="the field separator of the detector input file")
    optParser.add_argument("-o", "--output-file", dest="outfile", category="output", type=optParser.file,
                           help="define the output file for generated mapped detectors")
    optParser.add_argument("-i", "--id-column", default="id", dest="id",
                           help="Read detector ids from the given column")
    optParser.add_argument("-x", "--longitude-column", default="lon", dest="lon",
                           help="Read detector x-coordinate (lon) from the given column")
    optParser.add_argument("-y", "--latitude-column", default="lat", dest="lat",
                           help="Read detector y-coordinate (lat) from the given column")
    optParser.add_argument("--vclass", default="passenger",
                           help="only consider edges that permit the given vClass")
    optParser.add_argument("--det-output-file", dest="detOut", default="detector.out.xml", category="output",
                           type=optParser.file, help="Define the output file that generated detectors shall write to")
    optParser.add_argument("--interval", default=3600, type=optParser.time,
                           help="Define the aggregation interval of generated detectors")
    optParser.add_argument("-v", "--verbose", action="store_true",
                           default=False, help="tell me what you are doing")
    options = optParser.parse_args(args=args)
    if not options.netfile or not options.detfile or not options.outfile:
        optParser.print_help()
        sys.exit(1)

    return options


def main():
    options = get_options()
    net = sumolib.net.readNet(options.netfile)
    with open(options.outfile, 'w') as outf:
        sumolib.writeXMLHeader(outf, "$Id$", "additional", options=options)
        inputf = io.open(options.detfile, encoding="utf8")
        reader = csv.DictReader(inputf, delimiter=options.delimiter)
        checkedFields = False
        for row in reader:
            if not checkedFields:
                checkedFields = True
                for attr in ["id", "lon", "lat"]:
                    colName = getattr(options, attr)
                    if colName not in row:
                        sys.exit("Required column %s not found. Available columns are %s" % (
                                 colName, ",".join(row.keys())))
            detID = row[options.id]
            lon = float(row[options.lon])
            lat = float(row[options.lat])
            x, y = net.convertLonLat2XY(lon, lat)

            lanes = []
            radius = 0.1
            while not lanes and radius <= 1000:
                lanes = net.getNeighboringLanes(x, y, radius, True)
                lanes = [(d, lane) for lane, d in lanes if lane.allows(options.vclass)]
                radius *= 10
            if not lanes:
                sys.stderr.write("Could not find road for detector %s within %sm radius" % (detID, radius))
                continue
            lanes.sort()
            best = lanes[0][1]
            pos = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint((x, y), best.getShape())
            outf.write('    <inductionLoop id="%s" lane="%s" pos="%s" file="%s" freq="%s"/>\n' % (
                       detID, best.getID(), pos, options.detOut, options.interval))

        outf.write('</additional>\n')
        inputf.close()


if __name__ == "__main__":
    main()