File: diff.py

package info (click to toggle)
python-asdf 2.14.3-1%2Bdeb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,280 kB
  • sloc: python: 16,612; makefile: 124
file content (352 lines) | stat: -rw-r--r-- 12,281 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""
Implementation of command for displaying differences between two ASDF files.
"""

import argparse
import sys

import jmespath
from numpy import array_equal

try:
    # Provides cross-platform color support
    import colorama

    colorama.init()
    RED = colorama.Fore.RED
    GREEN = colorama.Fore.GREEN
    RESET = colorama.Style.RESET_ALL
except ImportError:
    from sys import platform

    # These platforms should support ansi color codes
    if platform.startswith("linux") or platform.startswith("darwin"):
        RED = "\x1b[31m"
        GREEN = "\x1b[32m"
        RESET = "\x1b[0m"
    else:
        RED = ""
        GREEN = ""
        RESET = ""

import asdf

from ..tagged import Tagged
from ..tags.core.ndarray import NDArrayType
from ..util import human_list
from .main import Command

__all__ = ["diff"]


RESET_NEWLINE = RESET + "\n"
NDARRAY_TAG = "core/ndarray"
LIST_MARKER = "-"
THIS_MARKER = GREEN + "> "
THAT_MARKER = RED + "< "


class Diff(Command):  # pragma: no cover
    """This class is the plugin implementation for the asdftool runner."""

    @classmethod
    def setup_arguments(cls, subparsers):
        epilog = """
examples:
  diff two files:
    asdftool diff file_before.asdf file_after.asdf
  ignore differences in the file's ASDF metadata:
    asdftool diff file_before.asdf file_after.asdf -i '[asdf_library,history]'
  ignore differences in the 'foo' field of all objects in a list:
    asdftool diff file_before.asdf file_after.asdf -i 'path.to.some_list[*].foo'

See https://jmespath.org/ for more information on constructing
JMESPath expressions.
    """.strip()

        parser = subparsers.add_parser(
            "diff",
            description="Report differences between two ASDF files",
            epilog=epilog,
            formatter_class=argparse.RawDescriptionHelpFormatter,
            help="Report differences between two ASDF files",
        )

        parser.add_argument("filenames", metavar="asdf_file", nargs=2, help="The ASDF files to compare.")

        parser.add_argument(
            "-m", "--minimal", action="store_true", help="Show minimal differences between the two files."
        )

        parser.add_argument(
            "-i",
            "--ignore",
            action="append",
            dest="ignore",
            help="JMESPath expression indicating tree nodes that should be ignored.",
        )

        parser.set_defaults(func=cls.run)
        return parser

    @classmethod
    def run(cls, args):
        return diff(args.filenames, args.minimal, ignore=args.ignore)


class ArrayNode:
    """This class is used to represent unique dummy nodes in the diff tree. In
    general these dummy nodes will be list elements that we want to keep track
    of but not necessarily display. This allows the diff output to be
    cleaner."""

    def __init__(self, name):
        self.name = name

    def __hash__(self):
        return hash(self.name)


class PrintTree:
    """This class is used to remember the nodes in the tree that have already
    been displayed in the diff output.
    """

    def __init__(self):
        self.__tree = dict(visited=False, children=dict())

    def get_print_list(self, node_list):
        at_end = False
        print_list = []
        current = self.__tree
        for node in ["tree"] + node_list:
            if at_end:
                print_list.append(node)
            elif node not in current["children"]:
                print_list.append(node)
                at_end = True
            elif not current["children"][node]["visited"]:
                print_list.append(node)
            else:
                print_list.append(None)
            if not at_end:
                current = current["children"][node]
        return print_list

    def __setitem__(self, node_list, visit):
        if not isinstance(node_list, list):
            raise TypeError("node_list parameter must be an instance of list")
        current = self.__tree
        for node in ["tree"] + node_list:
            if node not in current["children"]:
                current["children"][node] = dict(visited=True, children=dict())
            current = current["children"][node]


class DiffContext:
    """Class that contains context data of the diff to be computed"""

    def __init__(self, asdf0, asdf1, iostream, minimal=False, ignore_ids=None):
        self.asdf0 = asdf0
        self.asdf1 = asdf1
        self.iostream = iostream
        self.minimal = minimal
        self.print_tree = PrintTree()

        if ignore_ids is None:
            self.ignore_ids = set()
        else:
            self.ignore_ids = ignore_ids


def print_tree_context(diff_ctx, node_list, other, use_marker, last_was_list):
    """Print context information indicating location in ASDF tree."""
    prefix = ""
    marker = THAT_MARKER if other else THIS_MARKER
    for node in diff_ctx.print_tree.get_print_list(node_list):
        if node is not None:
            node = LIST_MARKER if isinstance(node, ArrayNode) else node + ":"
            # All of this logic is just to make the display of arrays prettier
            if use_marker:
                line_prefix = " " if last_was_list else marker + prefix[2:]
                line_suffix = "" if node == LIST_MARKER else RESET_NEWLINE
            else:
                line_prefix = prefix
                line_suffix = RESET_NEWLINE
            diff_ctx.iostream.write(line_prefix + node + line_suffix)
            last_was_list = node == LIST_MARKER
        prefix += "  "
    diff_ctx.print_tree[node_list] = True
    return last_was_list


def print_in_tree(diff_ctx, node_list, thing, other, use_marker=False, last_was_list=False, ignore_lwl=False):
    """Recursively print tree context and diff information about object."""
    last_was_list = print_tree_context(diff_ctx, node_list, other, use_marker, last_was_list)
    # If tree element is list, recursively print list contents
    if isinstance(thing, list):
        for i, subthing in enumerate(thing):
            key = ArrayNode(f"{node_list[-1]}_{i}")
            last_was_list = print_in_tree(
                diff_ctx,
                node_list + [key],
                subthing,
                other,
                use_marker=True,
                last_was_list=last_was_list,
                ignore_lwl=ignore_lwl,
            )
    # If tree element is dictionary, recursively print dictionary contents
    elif isinstance(thing, dict):
        for key in sorted(thing.keys()):
            last_was_list = print_in_tree(
                diff_ctx,
                node_list + [key],
                thing[key],
                other,
                use_marker=True,
                last_was_list=last_was_list,
                ignore_lwl=ignore_lwl,
            )
    # Print difference between leaf objects (no need to recurse further)
    else:
        use_marker = not last_was_list or ignore_lwl
        marker = THAT_MARKER if other else THIS_MARKER
        prefix = marker + "  " * len(node_list) if use_marker else " "
        diff_ctx.iostream.write(prefix + str(thing) + RESET_NEWLINE)
        last_was_list = False
    return last_was_list


def compare_objects(diff_ctx, obj0, obj1, keys=[]):
    """Displays diff of two objects if they are not equal"""
    if obj0 != obj1:
        print_in_tree(diff_ctx, keys, obj0, False, ignore_lwl=True)
        print_in_tree(diff_ctx, keys, obj1, True, ignore_lwl=True)


def print_dict_diff(diff_ctx, tree, node_list, keys, other):
    """Recursively traverses dictionary object and displays differences"""
    for key in keys:
        if diff_ctx.minimal:
            nodes = node_list
            key = key
        else:
            nodes = node_list + [key]
            key = tree[key]
        use_marker = not diff_ctx.minimal
        print_in_tree(diff_ctx, nodes, key, other, use_marker=use_marker)


def compare_ndarrays(diff_ctx, array0, array1, keys):
    """Compares two ndarray objects"""
    if isinstance(array0, list):
        array0 = {"data": array0}
    if isinstance(array1, list):
        array1 = {"data": array1}

    ignore_keys = {"source", "data"}
    compare_dicts(diff_ctx, array0, array1, keys, ignore_keys)

    differences = []
    for field in ["shape", "datatype"]:
        if array0.get(field) != array1.get(field):
            differences.append(field)

    array0 = NDArrayType.from_tree(array0, diff_ctx.asdf0)
    array1 = NDArrayType.from_tree(array1, diff_ctx.asdf1)
    if not array_equal(array0, array1):
        differences.append("contents")

    if differences:
        prefix = "  " * (len(keys) + 1)
        msg = f"ndarrays differ by {human_list(differences)}"
        diff_ctx.iostream.write(prefix + RED + msg + RESET_NEWLINE)


def both_are_ndarrays(tree0, tree1):
    """Returns True if both inputs correspond to ndarrays, False otherwise"""
    if not (isinstance(tree0, Tagged) and isinstance(tree1, Tagged)):
        return False
    if not (NDARRAY_TAG in tree0._tag and NDARRAY_TAG in tree1._tag):
        return False
    return True


def compare_dicts(diff_ctx, dict0, dict1, keys, ignores=set()):
    """Recursively compares two dictionary objects"""
    keys0 = set(dict0.keys()) - ignores
    keys1 = set(dict1.keys()) - ignores
    # Recurse into subtree elements that are shared by both trees
    for key in sorted(keys0 & keys1):
        obj0 = dict0[key]
        obj1 = dict1[key]
        compare_trees(diff_ctx, obj0, obj1, keys=keys + [key])
    # Display subtree elements existing only in this tree
    print_dict_diff(diff_ctx, dict0, keys, sorted(keys0 - keys1), False)
    # Display subtree elements existing only in that tree
    print_dict_diff(diff_ctx, dict1, keys, sorted(keys1 - keys0), True)


def compare_trees(diff_ctx, tree0, tree1, keys=[]):
    """Recursively traverses two ASDF tree and compares them"""
    if id(tree0) in diff_ctx.ignore_ids and id(tree1) in diff_ctx.ignore_ids:
        return

    if both_are_ndarrays(tree0, tree1):
        compare_ndarrays(diff_ctx, tree0, tree1, keys)
    elif isinstance(tree0, dict) and isinstance(tree1, dict):
        compare_dicts(diff_ctx, tree0, tree1, keys)
    elif isinstance(tree0, list) and isinstance(tree1, list):
        for i, (obj0, obj1) in enumerate(zip(tree0, tree1)):
            key = ArrayNode(f"item_{i}")
            compare_trees(diff_ctx, obj0, obj1, keys + [key])
    else:
        compare_objects(diff_ctx, tree0, tree1, keys)


def diff(filenames, minimal, iostream=sys.stdout, ignore=None):
    """
    Compare two ASDF files and write diff output to the stdout
    or the specified I/O stream.

    filenames : list of str
        List of ASDF filenames to compare.  Must be length 2.

    minimal : boolean
        Set to True to forego some pretty-printing to minimize
        the diff output.

    iostream : io.TextIOBase, optional
        Text-mode stream to write the diff, e.g., sys.stdout
        or an io.StringIO instance.  Defaults to stdout.

    ignore : list of str, optional
        List of JMESPath expressions indicating tree nodes that
        should be ignored.
    """
    if ignore is None:
        ignore_expressions = []
    else:
        ignore_expressions = [jmespath.compile(e) for e in ignore]

    try:
        with asdf.open(filenames[0], _force_raw_types=True) as asdf0:
            with asdf.open(filenames[1], _force_raw_types=True) as asdf1:
                ignore_ids = set()
                for expression in ignore_expressions:
                    for tree in [asdf0.tree, asdf1.tree]:
                        result = expression.search(tree)
                        if result is not None:
                            ignore_ids.add(id(result))
                        if isinstance(result, list):
                            for elem in result:
                                ignore_ids.add(id(elem))
                        elif isinstance(result, dict):
                            for value in result.values():
                                ignore_ids.add(id(value))

                diff_ctx = DiffContext(asdf0, asdf1, iostream, minimal=minimal, ignore_ids=ignore_ids)
                compare_trees(diff_ctx, asdf0.tree, asdf1.tree)
    except ValueError as error:
        raise RuntimeError(str(error))