1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
#!/usr/bin/env python3
# Invoke Datalogics PDF Checker & parse its output
# Purpose of this script:
# * abort the validation pipeline with a non-zero error code if any check fails on a PDF sample
# * aggregate all checks performed in a concise summary
# * parallelize the execution of this analysis on all PDF files
# * allow to ignore some errors considered harmless, listed in pdfchecker-ignore.json
# Note: among the 3 checkers we use for fpdf2, PDF Checker is the only one that report errors
# for unbalanced q/Q contexts in content streams, even if it does not provide a clear message.
# USAGE: ./pdfchecker.py [$pdf_filepath|--process-all-test-pdf-files|--print-aggregated-report]
import sys
from subprocess import check_output
from scripts.checker_commons import main
CHECKS_DETAILS_URL = "https://dev.datalogics.com/pdf-checker/the-json-profile-file/description-of-json-profile-parameters/"
UNPROCESSABLE_PDF_ERROR_LINE = "Unable to process document due to PDF Error"
CHECKER_SUMMARY_END_LINE = "<<=CHECKER_SUMMARY_END=>>"
def analyze_pdf_file(pdf_filepath):
try:
command = [
"PDF_Checker/pdfchecker",
"--profile",
"PDF_Checker/CheckerProfiles/everything.json",
"--input",
pdf_filepath,
"--password",
"fpdf2",
]
# print(" ".join(command))
output = check_output(command).decode()
# print(output)
return pdf_filepath, parse_output(command, output)
# pylint: disable=broad-exception-caught
except BaseException as error:
return pdf_filepath, error
def parse_output(command, output):
"""
Parse PDF Checker indented output into a dict-tree.
Tree leaves are empty dicts.
"""
lines = output.splitlines()
version = lines[0]
if UNPROCESSABLE_PDF_ERROR_LINE in lines:
return {
"failure": UNPROCESSABLE_PDF_ERROR_LINE,
"version": version,
}
assert CHECKER_SUMMARY_END_LINE in lines, f"{' '.join(command)} output:\n{output}"
lines = lines[lines.index(CHECKER_SUMMARY_END_LINE) + 2 :]
analysis = insert_indented(lines)
return {
"errors": [
error
for section in analysis.values()
for error in section.get("Errors:", {}).keys()
if error != "None"
],
"version": version,
}
def insert_indented(lines, node=None, depth=0, indent=0):
if node is None:
node = {}
prev_node = None
while lines:
line = lines[0]
if not line:
lines.pop(0)
continue
line_indent = len(line) - len(line.lstrip())
text = line[line_indent:].rstrip()
if line_indent >= indent and text in (
"Color Images",
"Grayscale Images",
"Monochrome Images",
):
if depth > 1:
# Leaving this branch of the tree after processing a "* Images" block
return
# Special case handled by creating a subnode for this "* Images" block:
lines.pop(0)
node[text] = {}
insert_indented(lines, node[text], depth + 1, indent)
continue
if line_indent == indent:
lines.pop(0)
prev_node = node[text] = {}
continue
if line_indent > indent:
if prev_node is None:
# Case of more than 1 level of indentation, e.g. "How To Optimize:" section
lines.pop(0)
node[text] = {}
continue
assert (
prev_node is not None
), f"depth={depth} indent={indent} line_indent={line_indent}: {line}"
insert_indented(lines, prev_node, depth + 1, indent + 4)
continue
return # line_indent < indent
return node
if __name__ == "__main__":
main("pdfchecker", analyze_pdf_file, sys.argv, CHECKS_DETAILS_URL)
|