1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
|
#! /usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0-or-later
# depthcharge-tools collections utilities
# Copyright (C) 2021-2022 Alper Nebi Yasak <alpernebiyasak@gmail.com>
# See COPYRIGHT and LICENSE files for full copyright information.
import collections
# Inheritance for config sections
class ConfigDict(collections.OrderedDict):
def __getitem__(self, key):
super_ = super()
if not isinstance(key, str) or "/" not in key:
return super_.__getitem__(key)
def getitem(key):
try:
return super_.__getitem__(key)
except KeyError:
return KeyError
def parents(leaf):
idx = leaf.find("/")
while idx != -1:
yield leaf[:idx]
idx = leaf.find("/", idx + 1)
yield leaf
items = list(
item for item in reversed([
getitem(p) for p in parents(key)
]) if item != KeyError
)
if all(isinstance(i, dict) for i in items):
return collections.ChainMap(*items)
if items:
return items[0]
raise KeyError(key)
# To write config sections in sort order
def SortedDict(key=None):
if not callable(key):
raise TypeError(
"SortedDict argument must be a callable, not {}"
.format(type(key).__name__)
)
class SortedDict(collections.UserDict):
__key = key
def __iter__(self):
yield from sorted(super().__iter__(), key=type(self).__key)
return SortedDict
def TypedList(T):
if not isinstance(T, type):
raise TypeError(
"TypedList argument must be a type, not {}"
.format(type(T).__name__)
)
name = "TypedList.{}List".format(str.title(T.__name__))
class TypedList(collections.UserList):
__type = T
__name__ = name
__qualname__ = name
def __init__(self, initlist=None):
if initlist is not None:
self.__typecheck(*initlist)
super().__init__(initlist)
def __typecheck(self, *values):
if not all(isinstance(value, self.__type) for value in values):
raise TypeError(
"{} items must be of type {}."
.format(type(self).__name__, self.__type.__name__)
)
def __setitem__(self, idx, value):
self.__typecheck(value)
return super().__setitem__(idx, value)
def __iadd__(self, other):
self.__typecheck(*other)
return super().__iadd__(other)
def append(self, value):
self.__typecheck(value)
return super().append(value)
def insert(self, idx, value):
self.__typecheck(value)
return super().insert(idx, value)
def extend(self, other):
self.__typecheck(*other)
return super().extend(other)
return TypedList
class DirectedGraph:
def __init__(self):
self.__edges = {}
def add_edge(self, node, child):
self.add_node(node)
self.add_node(child)
self.__edges[node].add(child)
def add_node(self, node):
if node not in self.__edges:
self.__edges[node] = set()
def remove_edge(self, node, child):
if node in self.__edges:
self.__edges[node].discard(child)
def remove_node(self, node):
self.__edges.pop(node, None)
for k, v in self.__edges.items():
v.discard(node)
def replace_node(self, node, replacement, merge=False):
if replacement in self.__edges and not merge:
raise ValueError(
"Replacement node '{}' already in graph."
.format(replacement)
)
parents = self.parents(node)
children = self.children(node)
self.remove_node(node)
self.add_node(replacement)
for p in parents:
self.add_edge(p, replacement)
for c in children:
self.add_edge(replacement, c)
def edges(self):
return set(
(n, c)
for n, cs in self.__edges.items()
for c in cs
)
def nodes(self):
return set(self.__edges.keys())
def children(self, *nodes):
node_children = set()
for node in nodes:
node_children.update(self.__edges.get(node, set()))
return node_children
def parents(self, *nodes):
node_parents = set()
for parent, children in self.__edges.items():
if children.intersection(nodes):
node_parents.add(parent)
return node_parents
def ancestors(self, *nodes):
nodes = set(nodes)
ancestors = self.parents(*nodes)
tmp = self.parents(*ancestors)
while tmp - ancestors:
ancestors.update(tmp)
tmp = self.parents(*ancestors)
return ancestors
def descendants(self, *nodes):
nodes = set(nodes)
descendants = self.children(*nodes)
tmp = self.children(*descendants)
while tmp - descendants:
descendants.update(tmp)
tmp = self.children(*descendants)
return descendants
def leaves(self, *nodes):
nodes = set(nodes)
leaves = set()
if len(nodes) == 0:
leaves.update(k for k, v in self.__edges.items() if not v)
return leaves
leaves = self.leaves()
node_leaves = set()
while nodes:
node_leaves.update(nodes.intersection(leaves))
nodes.difference_update(node_leaves)
nodes = self.children(*nodes)
return node_leaves
def roots(self, *nodes):
nodes = set(nodes)
roots = set()
if len(nodes) == 0:
roots.update(self.__edges.keys())
roots.difference_update(*self.__edges.values())
return roots
roots = self.roots()
node_roots = set()
while nodes:
node_roots.update(nodes.intersection(roots))
nodes.difference_update(node_roots)
nodes = self.parents(*nodes)
return node_roots
|