1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
#!/usr/bin/env python3
#
# Wireshark - Network traffic analyzer
# By Gerald Combs <gerald@wireshark.org>
# Copyright 1998 Gerald Combs
#
# SPDX-License-Identifier: GPL-2.0-or-later
'''Update the "packet-isobus-parameters.h" file.
Make-isobus creates a file containing isobus parameters
from the databases at isobus.net.
'''
import csv
import io
import os
import sys
import urllib.request, urllib.error, urllib.parse
import zipfile
def exit_msg(msg=None, status=1):
if msg is not None:
sys.stderr.write(msg + '\n\n')
sys.stderr.write(__doc__ + '\n')
sys.exit(status)
def open_url_zipped(url):
'''Open a URL of a zipped file.
'''
url_path = '/'.join(url)
req_headers = { 'User-Agent': 'Wireshark make-isobus' }
try:
req = urllib.request.Request(url_path, headers=req_headers)
response = urllib.request.urlopen(req)
body = response.read()
except Exception:
exit_msg('Error opening ' + url_path)
return zipfile.ZipFile(io.BytesIO(body))
def main():
isobus_output_path = os.path.join('epan', 'dissectors', 'packet-isobus-parameters.h')
isobus_zip_url = [ "https://www.isobus.net/isobus/attachments/", "isoExport_csv.zip"]
isobus_files = {
'indust' : 'Industry Groups.csv',
'glblfcts' : 'Global NAME Functions.csv',
'igfcts' :'IG Specific NAME Function.csv',
'manuf' : 'Manufacturer IDs.csv',
'pgn_spns' : 'SPNs and PGNs.csv'
}
zipf = open_url_zipped(isobus_zip_url)
# Industries csv
min_total = 4 # typically 8
f = zipf.read(isobus_files['indust'])
lines = f.decode('UTF-8', 'replace').splitlines()
if len(lines) < min_total:
exit_msg("{}: Not enough entries ({})".format(isobus_files['indust'], len(lines)))
indust_csv = csv.reader(lines)
next(indust_csv)
# Global Name Functions csv
min_total = 50 # XXX as of 2023-10-18
f = zipf.read(isobus_files['glblfcts'])
lines = f.decode('UTF-8', 'replace').splitlines()
if len(lines) < min_total:
exit_msg("{}: Not enough entries ({})".format(isobus_files['glblfcts'], len(lines)))
glbl_name_functions_csv = csv.reader(lines)
next(glbl_name_functions_csv)
# Specific Name Functions csv
min_total = 200 # 295 as of 2023-10-18
f = zipf.read(isobus_files['igfcts'])
lines = f.decode('UTF-8', 'replace').splitlines()
if len(lines) < min_total:
exit_msg("{}: Not enough entries ({})".format(isobus_files['igfcts'], len(lines)))
vehicle_system_names = {}
specific_functions = {}
specific_functions_csv = csv.reader(lines)
next(specific_functions_csv)
for row in specific_functions_csv:
ig_id, vs_id, vs_name, f_id, f_name = row[:5]
new_id = int(ig_id) * 256 + int(vs_id)
if len(vs_name) > 50:
if new_id != 539: # 539: Weeders ...
print(f"shortening {new_id}: {vs_name} -> {vs_name[:36]}")
vs_name = vs_name[:36]
vehicle_system_names[new_id] = vs_name
#vehicle_system_names.setdefault(ig_id, {}).setdefault(vs_id, vs_name)
new_id2 = 256 * new_id + int(f_id)
specific_functions[new_id2] = f_name
# Manufacturers csv
min_total = 1000 # 1396 as of 2023-10-18
f = zipf.read(isobus_files['manuf'])
lines = f.decode('UTF-8', 'replace').splitlines()
if len(lines) < min_total:
exit_msg("{}: Not enough entries ({})".format(isobus_files['manuf'], len(lines)))
manuf_csv = csv.reader(lines)
next(manuf_csv)
# PGN SPN csv
min_total = 20000 # 23756 as of 2023-10-18
f = zipf.read(isobus_files['pgn_spns'])
lines = f.decode('UTF-8', 'replace').splitlines()
if len(lines) < min_total:
exit_msg("{}: Not enough entries ({})".format(isobus_files['pgn_spns'], len(lines)))
pgn_names = {}
pgn_spn_csv = csv.reader(lines)
next(pgn_spn_csv)
for row in pgn_spn_csv:
try:
pgn_id, pgn_name, = row[:2]
if not pgn_name.startswith("Proprietary B"):
pgn_names[int(pgn_id)] = pgn_name.replace("\"","'")
except Exception:
pass
# prepare output file
try:
output_fd = io.open(isobus_output_path, 'w', encoding='UTF-8')
except Exception:
exit_msg("Couldn't open ({}) ".format(isobus_output_path))
output_fd.write('''/*
* This file was generated by running ./tools/make-isobus.py.
*
* SPDX-License-Identifier: GPL-2.0-or-later
*
* The ISOBUS public listings available from:
* <https://www.isobus.net/isobus/attachments/isoExport_csv.zip>
*
*/
#ifndef __PACKET_ISOBUS_PARAMETERS_H__
#define __PACKET_ISOBUS_PARAMETERS_H__
''')
# Write Industries
output_fd.write("static const value_string _isobus_industry_groups[] = {\n")
for row in sorted(indust_csv, key=lambda x: int(x[0])):
output_fd.write(f" {{ {row[0]}, \"{row[1]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_industry_groups_ext = VALUE_STRING_EXT_INIT(_isobus_industry_groups);\n\n")
# Write Vehicle System Names
output_fd.write("/* key: 256 * Industry-Group-ID + Vehicle-Group-ID */\n")
output_fd.write("static const value_string _isobus_vehicle_systems[] = {\n")
for key in sorted(vehicle_system_names):
output_fd.write(f" {{ {hex(key)}, \"{vehicle_system_names[key]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_vehicle_systems_ext = VALUE_STRING_EXT_INIT(_isobus_vehicle_systems);\n\n")
# Write Global Name Functions
output_fd.write("static const value_string _isobus_global_name_functions[] = {\n")
for row in sorted(glbl_name_functions_csv, key=lambda x: int(x[0])):
output_fd.write(f" {{ {row[0]}, \"{row[1]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_global_name_functions_ext = VALUE_STRING_EXT_INIT(_isobus_global_name_functions);\n\n")
# IG Specific Global Name Functions
output_fd.write("/* key: 65536 * Industry-Group-ID + 256 * Vehicle-System-ID + Function-ID */\n")
output_fd.write("static const value_string _isobus_ig_specific_name_functions[] = {\n")
for key in sorted(specific_functions):
output_fd.write(f" {{ {hex(key)}, \"{specific_functions[key]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_ig_specific_name_functions_ext = VALUE_STRING_EXT_INIT(_isobus_ig_specific_name_functions);\n\n")
# Write Manufacturers
output_fd.write("static const value_string _isobus_manufacturers[] = {\n")
for row in sorted(manuf_csv, key=lambda x: int(x[0])):
output_fd.write(f" {{ {row[0]}, \"{row[1]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_manufacturers_ext = VALUE_STRING_EXT_INIT(_isobus_manufacturers);\n\n")
# PGN Names
output_fd.write("static const value_string _isobus_pgn_names[] = {\n")
for key in sorted(pgn_names):
output_fd.write(f" {{ {key}, \"{pgn_names[key]}\" }},\n")
output_fd.write(" { 0, NULL }\n")
output_fd.write("};\n")
output_fd.write("static value_string_ext isobus_pgn_names_ext = VALUE_STRING_EXT_INIT(_isobus_pgn_names);\n\n")
output_fd.write("#endif /* __PACKET_ISOBUS_PARAMETERS_H__ */")
if __name__ == '__main__':
main()
|