File: collections.py

package info (click to toggle)
depthcharge-tools 0.6.2-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 760 kB
  • sloc: python: 6,286; sh: 650; makefile: 12
file content (230 lines) | stat: -rw-r--r-- 6,273 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later

# depthcharge-tools collections utilities
# Copyright (C) 2021-2022 Alper Nebi Yasak <alpernebiyasak@gmail.com>
# See COPYRIGHT and LICENSE files for full copyright information.

import collections

# Inheritance for config sections
class ConfigDict(collections.OrderedDict):
    def __getitem__(self, key):
        super_ = super()
        if not isinstance(key, str) or "/" not in key:
            return super_.__getitem__(key)

        def getitem(key):
            try:
                return super_.__getitem__(key)
            except KeyError:
                return KeyError

        def parents(leaf):
            idx = leaf.find("/")
            while idx != -1:
                yield leaf[:idx]
                idx = leaf.find("/", idx + 1)
            yield leaf

        items = list(
            item for item in reversed([
                getitem(p) for p in parents(key)
            ]) if item != KeyError
        )

        if all(isinstance(i, dict) for i in items):
            return collections.ChainMap(*items)

        if items:
            return items[0]

        raise KeyError(key)


# To write config sections in sort order
def SortedDict(key=None):
    if not callable(key):
        raise TypeError(
            "SortedDict argument must be a callable, not {}"
            .format(type(key).__name__)
        )

    class SortedDict(collections.UserDict):
        __key = key

        def __iter__(self):
            yield from sorted(super().__iter__(), key=type(self).__key)

    return SortedDict


def TypedList(T):
    if not isinstance(T, type):
        raise TypeError(
            "TypedList argument must be a type, not {}"
            .format(type(T).__name__)
        )

    name = "TypedList.{}List".format(str.title(T.__name__))

    class TypedList(collections.UserList):
        __type = T
        __name__ = name
        __qualname__ = name

        def __init__(self, initlist=None):
            if initlist is not None:
                self.__typecheck(*initlist)
            super().__init__(initlist)

        def __typecheck(self, *values):
            if not all(isinstance(value, self.__type) for value in values):
                raise TypeError(
                    "{} items must be of type {}."
                    .format(type(self).__name__, self.__type.__name__)
                )

        def __setitem__(self, idx, value):
            self.__typecheck(value)
            return super().__setitem__(idx, value)

        def __iadd__(self, other):
            self.__typecheck(*other)
            return super().__iadd__(other)

        def append(self, value):
            self.__typecheck(value)
            return super().append(value)

        def insert(self, idx, value):
            self.__typecheck(value)
            return super().insert(idx, value)

        def extend(self, other):
            self.__typecheck(*other)
            return super().extend(other)

    return TypedList


class DirectedGraph:
    def __init__(self):
        self.__edges = {}

    def add_edge(self, node, child):
        self.add_node(node)
        self.add_node(child)
        self.__edges[node].add(child)

    def add_node(self, node):
        if node not in self.__edges:
            self.__edges[node] = set()

    def remove_edge(self, node, child):
        if node in self.__edges:
            self.__edges[node].discard(child)

    def remove_node(self, node):
        self.__edges.pop(node, None)
        for k, v in self.__edges.items():
            v.discard(node)

    def replace_node(self, node, replacement, merge=False):
        if replacement in self.__edges and not merge:
            raise ValueError(
                "Replacement node '{}' already in graph."
                .format(replacement)
            )

        parents = self.parents(node)
        children = self.children(node)
        self.remove_node(node)

        self.add_node(replacement)
        for p in parents:
            self.add_edge(p, replacement)
        for c in children:
            self.add_edge(replacement, c)

    def edges(self):
        return set(
            (n, c)
            for n, cs in self.__edges.items()
            for c in cs
        )

    def nodes(self):
        return set(self.__edges.keys())

    def children(self, *nodes):
        node_children = set()
        for node in nodes:
            node_children.update(self.__edges.get(node, set()))

        return node_children

    def parents(self, *nodes):
        node_parents = set()
        for parent, children in self.__edges.items():
            if children.intersection(nodes):
                node_parents.add(parent)

        return node_parents

    def ancestors(self, *nodes):
        nodes = set(nodes)

        ancestors = self.parents(*nodes)
        tmp = self.parents(*ancestors)
        while tmp - ancestors:
            ancestors.update(tmp)
            tmp = self.parents(*ancestors)

        return ancestors

    def descendants(self, *nodes):
        nodes = set(nodes)

        descendants = self.children(*nodes)
        tmp = self.children(*descendants)
        while tmp - descendants:
            descendants.update(tmp)
            tmp = self.children(*descendants)

        return descendants

    def leaves(self, *nodes):
        nodes = set(nodes)

        leaves = set()
        if len(nodes) == 0:
            leaves.update(k for k, v in self.__edges.items() if not v)
            return leaves

        leaves = self.leaves()
        node_leaves = set()
        while nodes:
            node_leaves.update(nodes.intersection(leaves))
            nodes.difference_update(node_leaves)
            nodes = self.children(*nodes)

        return node_leaves

    def roots(self, *nodes):
        nodes = set(nodes)

        roots = set()
        if len(nodes) == 0:
            roots.update(self.__edges.keys())
            roots.difference_update(*self.__edges.values())
            return roots

        roots = self.roots()
        node_roots = set()
        while nodes:
            node_roots.update(nodes.intersection(roots))
            nodes.difference_update(node_roots)
            nodes = self.parents(*nodes)

        return node_roots