1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
|
import collections
import typing
import uuid
from qtpy.QtCore import QObject, QPointF, QSizeF
from .base import NodeBase, Serializable
from .enums import ReactToConnectionState
from .node_data import NodeData, NodeDataModel, NodeDataType
from .node_geometry import NodeGeometry
from .node_graphics_object import NodeGraphicsObject
from .node_state import NodeState
from .port import Port, PortType
from .style import NodeStyle
class Node(QObject, Serializable, NodeBase):
def __init__(self, data_model: NodeDataModel):
'''
A single Node in the scene
Parameters
----------
data_model : NodeDataModel
'''
super().__init__()
self._model = data_model
self._uid = str(uuid.uuid4())
self._style = data_model.node_style
self._state = NodeState(self)
self._geometry = NodeGeometry(self)
self._graphics_obj = None
self._geometry.recalculate_size()
# propagate data: model => node
self._model.data_updated.connect(self._on_port_index_data_updated)
self._model.embedded_widget_size_updated.connect(self.on_node_size_updated)
def __hash__(self):
return id(self._uid)
def __eq__(self, node):
try:
return node.id == self.id and self.model is node.model
except AttributeError:
return False
def has_any_connection(self, node: 'Node') -> bool:
"""
Is this node connected to `node` through any port?
Parameters
----------
node : Node
The node to check connectivity
Returns
-------
connected : bool
"""
return any(self.has_connection_by_port_type(node, port_type)
for port_type in PortType)
def has_connection_by_port_type(self, target: 'Node',
port_type: PortType) -> bool:
"""
Is this node connected to `target` through an input/output port?
Parameters
----------
target : Node
The target node to check connectivity
port_type : PortType
The port type (``PortType.input``, ``PortType.output``) to check
Returns
-------
connected : bool
"""
return any(
path[-1] == target
for path in self.walk_paths_by_port_type(port_type)
)
def walk_paths_by_port_type(
self, port_type: PortType) -> typing.Iterable['Node']:
"""
Yields paths to connected nodes by port type
Yields
------
node_path : tuple
The path to the node
"""
seen = set([None])
pending = collections.deque([([], self)])
if port_type == PortType.output:
def get_connection_nodes(state):
for con in state.output_connections:
yield con.input_node
elif port_type == PortType.input:
def get_connection_nodes(state):
for con in state.input_connections:
yield con.output_node
else:
raise ValueError(f'Unexpected port_type {port_type}')
while pending:
node_path, node = pending.popleft()
seen.add(node)
if node is not self:
yield tuple(node_path) + (node, )
node_path = list(node_path) + [node]
for node in get_connection_nodes(node.state):
if node not in seen:
pending.append((node_path, node))
def __getitem__(self, key):
return self._state[key]
def _cleanup(self):
if self._graphics_obj is not None:
self._graphics_obj._cleanup()
self._graphics_obj = None
self._geometry = None
def __getstate__(self) -> dict:
"""
Save
Returns
-------
value : dict
"""
return {
"id": self._uid,
"model": self._model.__getstate__(),
"position": {"x": self._graphics_obj.pos().x(),
"y": self._graphics_obj.pos().y()}
}
def __setstate__(self, state: dict):
"""
Restore
Parameters
----------
state : dict
"""
self._uid = state["id"]
if self._graphics_obj:
pos = state["position"]
self.position = (pos["x"], pos["y"])
self._model.__setstate__(state["model"])
@property
def id(self) -> str:
"""
Node unique identifier (uuid)
Returns
-------
value : str
"""
return self._uid
def react_to_possible_connection(self, reacting_port_type: PortType,
reacting_data_type: NodeDataType,
scene_point: QPointF
):
"""
React to possible connection
Parameters
----------
port_type : PortType
node_data_type : NodeDataType
scene_point : QPointF
"""
transform = self._graphics_obj.sceneTransform()
inverted, invertible = transform.inverted()
if invertible:
pos = inverted.map(scene_point)
self._geometry.dragging_position = pos
self._graphics_obj.update()
self._state.set_reaction(ReactToConnectionState.reacting,
reacting_port_type, reacting_data_type)
def reset_reaction_to_connection(self):
self._state.set_reaction(ReactToConnectionState.not_reacting)
self._graphics_obj.update()
@property
def graphics_object(self) -> NodeGraphicsObject:
"""
Node graphics object
Returns
-------
value : NodeGraphicsObject
"""
return self._graphics_obj
@graphics_object.setter
def graphics_object(self, graphics: NodeGraphicsObject):
"""
Set graphics object
Parameters
----------
graphics : NodeGraphicsObject
"""
self._graphics_obj = graphics
self._geometry.recalculate_size()
@property
def geometry(self) -> NodeGeometry:
"""
Node geometry
Returns
-------
value : NodeGeometry
"""
return self._geometry
@property
def model(self) -> NodeDataModel:
"""
Node data model
Returns
-------
value : NodeDataModel
"""
return self._model
def propagate_data(self, node_data: NodeData, input_port: Port):
"""
Propagates incoming data to the underlying model.
Parameters
----------
node_data : NodeData
input_port : int
"""
if input_port.node is not self:
raise ValueError('Port does not belong to this Node')
elif input_port.port_type != PortType.input:
raise ValueError('Port is not an input port')
self._model.set_in_data(node_data, input_port)
# Recalculate the nodes visuals. A data change can result in the node
# taking more space than before, so self forces a recalculate+repaint
# on the affected node
self._graphics_obj.set_geometry_changed()
self._geometry.recalculate_size()
self._graphics_obj.update()
self._graphics_obj.move_connections()
def _on_port_index_data_updated(self, port_index: int):
"""
Data has been updated on this Node's output port port_index;
propagate it to any connections.
Parameters
----------
index : int
"""
port = self[PortType.output][port_index]
self.on_data_updated(port)
def on_data_updated(self, port: Port):
"""
Fetches data from model's output port and propagates it along the
connection
Parameters
----------
port : Port
"""
node_data = port.data
for conn in port.connections:
conn.propagate_data(node_data)
def on_node_size_updated(self):
"""
update the graphic part if the size of the embeddedwidget changes
"""
widget = self.model.embedded_widget()
if widget:
widget.adjustSize()
self.geometry.recalculate_size()
for conn in self.state.all_connections:
conn.graphics_object.move()
@property
def size(self) -> QSizeF:
"""
Get the node size
Parameters
----------
node : Node
Returns
-------
value : QSizeF
"""
return self._geometry.size
@property
def position(self) -> QPointF:
"""
Get the node position
Parameters
----------
node : Node
Returns
-------
value : QPointF
"""
return self._graphics_obj.pos()
@position.setter
def position(self, pos):
if not isinstance(pos, QPointF):
px, py = pos
pos = QPointF(px, py)
self._graphics_obj.setPos(pos)
self._graphics_obj.move_connections()
@property
def style(self) -> NodeStyle:
'Node style'
return self._style
@property
def state(self) -> NodeState:
"""
Node state
Returns
-------
value : NodeState
"""
return self._state
def __repr__(self):
return (f'<{self.__class__.__name__} model={self._model} '
f'uid={self._uid!r}>')
|