File: node_merger.py

package info (click to toggle)
blender 4.3.2%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 309,564 kB
  • sloc: cpp: 2,385,210; python: 330,236; ansic: 280,972; xml: 2,446; sh: 972; javascript: 317; makefile: 170
file content (345 lines) | stat: -rw-r--r-- 11,452 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# SPDX-FileCopyrightText: 2021-2022 Blender Foundation
#
# SPDX-License-Identifier: GPL-2.0-or-later

import collections

from typing import Any, Sequence, Optional
from mathutils import Vector
from mathutils.kdtree import KDTree

from .errors import MetarigError
from .misc import ArmatureObject
from ..base_rig import BaseRig, GenerateCallbackHost
from ..base_generate import GeneratorPlugin


class NodeMerger(GeneratorPlugin):
    """
    Utility that allows rigs to interact based on common points in space.

    Rigs can register node objects representing locations during the
    initialize stage, and at the end the plugin sorts them into buckets
    based on proximity. For each such bucket a group object is created
    and allowed to further process the nodes.

    Nodes chosen by the groups as being 'final' become sub-objects of
    the plugin and receive stage callbacks.

    The domain parameter allows potentially having multiple completely
    separate layers of nodes with different purpose.
    """

    epsilon = 1e-5

    nodes: list['BaseMergeNode']
    final_nodes: list['BaseMergeNode']
    groups: list['MergeGroup']

    def __init__(self, generator, domain: Any):
        """
        Construct a new merger instance.

        @param domain: An arbitrary identifier to allow multiple independent merging domains.
        """
        super().__init__(generator)

        assert domain is not None
        assert generator.stage == 'initialize'

        self.domain = domain
        self.nodes = []
        self.final_nodes = []
        self.groups = []
        self.frozen = False

    def register_node(self, node: 'BaseMergeNode'):
        """
        Add a new node to generation, before merging is frozen.
        """
        assert not self.frozen
        node.generator_plugin = self
        self.nodes.append(node)

    def initialize(self):
        self.frozen = True

        nodes = self.nodes
        tree = KDTree(len(nodes))

        for i, node in enumerate(nodes):
            tree.insert(node.point, i)

        tree.balance()
        processed = set()
        final_nodes = []
        groups = []

        for i in range(len(nodes)):
            if i in processed:
                continue

            # Find points to merge
            pending = [i]
            merge_set = set(pending)

            while pending:
                added = set()
                for j in pending:
                    point = nodes[j].point
                    eps = max(1.0, point.length) * self.epsilon
                    for co, idx, dist in tree.find_range(point, eps):
                        added.add(idx)
                pending = added.difference(merge_set)
                merge_set.update(added)

            assert merge_set.isdisjoint(processed)

            processed.update(merge_set)

            # Group the points
            merge_list = [nodes[i] for i in merge_set]
            merge_list.sort(key=lambda x: x.name)

            group_class = merge_list[0].group_class

            for item in merge_list[1:]:
                cls = item.group_class

                if issubclass(cls, group_class):
                    group_class = cls
                elif not issubclass(group_class, cls):
                    raise MetarigError(
                        'Group class conflict: {} and {} from {} of {}'.format(
                            group_class, cls, item.name, item.rig.base_bone,
                        )
                    )

            group = group_class(merge_list)
            group.build(final_nodes)

            groups.append(group)

        self.final_nodes = self.rigify_sub_objects = final_nodes
        self.groups = groups


class MergeGroup(object):
    """
    Standard node group, merges nodes based on certain rules and priorities.

    1. Nodes are classified into main and query nodes; query nodes are not merged.
    2. Nodes owned by the same rig cannot merge with each other.
    3. Node can only merge into target if node.can_merge_into(target) is true.
    4. Among multiple candidates in one rig, node.get_merge_priority(target) is used.
    5. The largest clusters of nodes that can merge are picked until none are left.

    The master nodes of the chosen clusters, plus query nodes, become 'final'.
    """

    main_nodes: list['MainMergeNode']    # All main nodes in the group.
    query_nodes: list['QueryMergeNode']  # All query nodes in the group.
    final_nodes: list['MainMergeNode']   # All main nodes not merged into any other node.

    def __init__(self, nodes: list['BaseMergeNode']):
        self.nodes = nodes

        for node in nodes:
            assert isinstance(node, (MainMergeNode, QueryMergeNode))
            node.group = self

        self.main_nodes = [n for n in nodes if isinstance(n, MainMergeNode)]
        self.query_nodes = [n for n in nodes if isinstance(n, QueryMergeNode)]

    def build(self, final_nodes):
        main_nodes = self.main_nodes

        # Sort nodes into rig buckets - can't merge within the same rig
        rig_table = collections.defaultdict(list)

        for node in main_nodes:
            rig_table[node.rig].append(node)

        # Build a 'can merge' table
        merge_table = {n: set() for n in main_nodes}

        for node in main_nodes:
            for rig, tgt_nodes in rig_table.items():
                if rig is not node.rig:
                    nodes = [n for n in tgt_nodes if node.can_merge_into(n)]
                    if nodes:
                        best_node = max(nodes, key=node.get_merge_priority)
                        merge_table[best_node].add(node)

        # Output groups starting with largest
        self.final_nodes = []

        pending = set(main_nodes)

        while pending:
            # Find the largest group
            nodes = [n for n in main_nodes if n in pending]
            max_len = max(len(merge_table[n]) for n in nodes)

            nodes = [n for n in nodes if len(merge_table[n]) == max_len]

            # If a tie, try to resolve using comparison
            if len(nodes) > 1:
                weighted_nodes = [
                    (n, sum(
                        1 if (n.is_better_cluster(n2)
                              and not n2.is_better_cluster(n)) else 0
                        for n2 in nodes
                    ))
                    for n in nodes
                ]
                max_weight = max(wn[1] for wn in weighted_nodes)
                nodes = [wn[0] for wn in weighted_nodes if wn[1] == max_weight]

            # Final tiebreaker is the name
            best = min(nodes, key=lambda n: n.name)
            child_set = merge_table[best]

            # Link children
            best.point = sum((c.point for c in child_set),
                             best.point) / (len(child_set) + 1)

            for child in [n for n in main_nodes if n in child_set]:
                child.point = best.point
                best.merge_from(child)
                child.merge_into(best)

            final_nodes.append(best)
            self.final_nodes.append(best)

            best.merge_done()

            # Remove merged nodes from the table
            pending.remove(best)
            pending -= child_set

            for children in merge_table.values():
                children &= pending

        for node in self.query_nodes:
            node.merge_done()

        final_nodes += self.query_nodes


class BaseMergeNode(GenerateCallbackHost):
    """Base class of merge-able nodes."""

    rig: BaseRig
    obj: ArmatureObject
    name: str
    point: Vector

    merge_domain: Any = None
    merger = NodeMerger
    group_class = MergeGroup

    generator_plugin: NodeMerger
    group: MergeGroup

    def __init__(self, rig: BaseRig, name: str, point: Vector | Sequence[float], *,
                 domain: Any = None):
        self.rig = rig
        self.obj = rig.obj
        self.name = name
        self.point = Vector(point)

        merger = self.merger(rig.generator, domain or self.merge_domain)
        merger.register_node(self)

    def register_new_bone(self, new_name: str, old_name: Optional[str] = None):
        self.generator_plugin.register_new_bone(new_name, old_name)

    def can_merge_into(self, other: 'MainMergeNode') -> bool:
        """Checks if this main or query node can merge into the specified master node."""
        raise NotImplementedError

    def get_merge_priority(self, other: 'MainMergeNode') -> float:
        """Rank potential candidates to merge into."""
        return 0

    def merge_done(self):
        """Called after all merging operations are complete."""
        pass


class MainMergeNode(BaseMergeNode):
    """
    Base class of standard merge-able nodes. Each node can either be
    a master of its cluster or a merged child node. Children become
    sub-objects of their master to receive callbacks in defined order.
    """

    merged_master: 'MainMergeNode'          # Master of this merge cluster; may be self.
    merged_into: Optional['MainMergeNode']  # Master of this cluster if not self.
    merged: list['MainMergeNode']           # List of nodes merged into this one.

    def __init__(self, rig, name, point, *, domain=None):
        super().__init__(rig, name, point, domain=domain)

        self.merged_into = None
        self.merged = []

    def get_merged_siblings(self):
        """Retrieve the list of all nodes merged together with this one,
         starting with the master node."""
        master = self.merged_master
        return [master, *master.merged]

    def is_better_cluster(self, other: 'MainMergeNode'):
        """Compare with the other node to choose between cluster masters."""
        return False

    # noinspection PyMethodMayBeStatic
    def can_merge_from(self, _other: 'MainMergeNode'):
        """Checks if the other node can be merged into this one."""
        return True

    def can_merge_into(self, other: 'MainMergeNode'):
        """Checks if this node can merge into the specified master."""
        return other.can_merge_from(self)

    def merge_into(self, other: 'MainMergeNode'):
        """Called when it's decided to merge this node into a different master node."""
        self.merged_into = other

    def merge_from(self, other: 'MainMergeNode'):
        """Called when it's decided to merge a different node into this master node."""
        self.merged.append(other)

    @property
    def is_master_node(self):
        """Returns if this node is a master of a merge cluster."""
        return not self.merged_into

    def merge_done(self):
        self.merged_master = self.merged_into or self
        self.rigify_sub_objects = list(self.merged)

        for child in self.merged:
            child.merge_done()


# noinspection PyAbstractClass
class QueryMergeNode(BaseMergeNode):
    """Base class for special nodes used only to query which nodes are at a certain location."""

    is_master_node = False
    require_match = True

    matched_nodes: list['MainMergeNode']  # Master nodes this query matched with.

    def merge_done(self):
        self.matched_nodes = [
            n for n in self.group.final_nodes if self.can_merge_into(n)
        ]
        self.matched_nodes.sort(key=self.get_merge_priority, reverse=True)

        if self.require_match and not self.matched_nodes:
            self.rig.raise_error(
                'Could not match control node for {}', self.name)