1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
#!/usr/bin/env python
# type: ignore
"""
Generate per-sqlstate errors from PostgreSQL source code.
The script can be run at a new PostgreSQL release to refresh the module.
"""
# Copyright (C) 2020 The Psycopg Team
import os
import re
import sys
import logging
from collections import defaultdict, namedtuple
from urllib.request import urlopen
from psycopg.errors import get_base_exception
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
Error = namedtuple("Error", "sqlstate errlabel clsname basename")
def main():
# Note: add "master" for a preview
classes, errors = fetch_errors("9.6 10 11 12 13 14 15 16 17 18".split())
fn = os.path.dirname(__file__) + "/../psycopg/psycopg/errors.py"
update_file(fn, generate_module_data(classes, errors))
fn = os.path.dirname(__file__) + "/../docs/api/errors.rst"
update_file(fn, generate_docs_data(classes, errors))
def parse_errors_txt(url):
classes = {}
errors = defaultdict(dict)
page = urlopen(url)
for line in page.read().decode("ascii").splitlines():
# Strip comments and skip blanks
if not (line := line.split("#")[0].strip()):
continue
# Parse a section
if m := re.match("Section: (Class (..) - .+)", line):
label, class_ = m.groups()
classes[class_] = label
continue
# Parse an error
if m := re.match(r"(.....)\s+(?:E|W|S)\s+ERRCODE_(\S+)(?:\s+(\S+))?$", line):
sqlstate, macro, spec = m.groups()
# skip sqlstates without specs as they are not publicly visible
if not spec:
continue
errlabel = spec.upper()
errors[class_][sqlstate] = errlabel
continue
# We don't expect anything else
raise ValueError("unexpected line:\n%s" % line)
return classes, errors
def tag_from_version(version: str) -> str:
if version == "master":
return version
tver = tuple(map(int, version.split()[0].split(".")))
tag = "%s%s_STABLE" % (
(tver[0] >= 10 and "REL_" or "REL"),
version.replace(".", "_"),
)
return tag
def fetch_errors(versions):
classes = {}
errors = defaultdict(dict)
for version in versions:
logger.info("fetching errors from version %s", version)
tag = tag_from_version(version)
url = (
"https://raw.githubusercontent.com/postgres/postgres"
f"/refs/heads/{tag}/src/backend/utils/errcodes.txt"
)
c1, e1 = parse_errors_txt(url)
classes.update(c1)
for c, cerrs in e1.items():
errors[c].update(cerrs)
# clean up data
# success and warning - never raised
del classes["00"]
del classes["01"]
del errors["00"]
del errors["01"]
specific = {
"38002": "ModifyingSqlDataNotPermittedExt",
"38003": "ProhibitedSqlStatementAttemptedExt",
"38004": "ReadingSqlDataNotPermittedExt",
"39004": "NullValueNotAllowedExt",
"XX000": "InternalError_",
}
seen = set(
"""
Error Warning InterfaceError DataError DatabaseError ProgrammingError
IntegrityError InternalError NotSupportedError OperationalError
""".split()
)
for c, cerrs in errors.items():
for sqstate, errlabel in list(cerrs.items()):
if sqstate in specific:
clsname = specific[sqstate]
else:
clsname = errlabel.title().replace("_", "")
if clsname in seen:
raise Exception("class already existing: %s" % clsname)
seen.add(clsname)
basename = get_base_exception(sqstate).__name__
cerrs[sqstate] = Error(sqstate, errlabel, clsname, basename)
return classes, errors
def generate_module_data(classes, errors):
yield ""
for clscode, clslabel in sorted(classes.items()):
yield f"""
# {clslabel}
"""
for _, e in sorted(errors[clscode].items()):
yield f"""\
class {e.clsname}({e.basename},
code={e.sqlstate!r}, name={e.errlabel!r}):
pass
"""
yield ""
def generate_docs_data(classes, errors):
Line = namedtuple("Line", "colstate colexc colbase, sqlstate")
lines = [Line("SQLSTATE", "Exception", "Base exception", None)]
for clscode in sorted(classes):
for _, error in sorted(errors[clscode].items()):
lines.append(
Line(
f"``{error.sqlstate}``",
f"`!{error.clsname}`",
f"`!{error.basename}`",
error.sqlstate,
)
)
widths = [max(len(line[c]) for line in lines) for c in range(3)]
h = Line(*(["=" * w for w in widths] + [None]))
lines.insert(0, h)
lines.insert(2, h)
lines.append(h)
h1 = "-" * (sum(widths) + len(widths) - 1)
sqlclass = None
yield ""
for line in lines:
cls = line.sqlstate[:2] if line.sqlstate else None
if cls and cls != sqlclass:
yield re.sub(r"(Class\s+[^\s]+)", r"**\1**", classes[cls])
yield h1
sqlclass = cls
yield (
"%-*s %-*s %-*s"
% (
widths[0],
line.colstate,
widths[1],
line.colexc,
widths[2],
line.colbase,
)
).rstrip()
yield ""
def update_file(fn, new_lines):
logger.info("updating %s", fn)
with open(fn) as f:
lines = f.read().splitlines()
istart, iend = (
i
for i, line in enumerate(lines)
if re.match(r"\s*(#|\.\.)\s*autogenerated:\s+(start|end)", line)
)
lines[istart + 1 : iend] = new_lines
with open(fn, "w") as f:
for line in lines:
f.write(line + "\n")
if __name__ == "__main__":
sys.exit(main())
|