1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
from math import sqrt
from typing import Any, List, Optional
import torch
from torch import Tensor
BACKENDS = {'graphviz', 'networkx'}
def has_graphviz() -> bool:
try:
import graphviz
except ImportError:
return False
try:
graphviz.Digraph().pipe()
except graphviz.backend.ExecutableNotFound:
return False
return True
def visualize_graph(
edge_index: Tensor,
edge_weight: Optional[Tensor] = None,
path: Optional[str] = None,
backend: Optional[str] = None,
node_labels: Optional[List[str]] = None,
) -> Any:
r"""Visualizes the graph given via :obj:`edge_index` and (optional)
:obj:`edge_weight`.
Args:
edge_index (torch.Tensor): The edge indices.
edge_weight (torch.Tensor, optional): The edge weights.
path (str, optional): The path to where the plot is saved.
If set to :obj:`None`, will visualize the plot on-the-fly.
(default: :obj:`None`)
backend (str, optional): The graph drawing backend to use for
visualization (:obj:`"graphviz"`, :obj:`"networkx"`).
If set to :obj:`None`, will use the most appropriate
visualization backend based on available system packages.
(default: :obj:`None`)
node_labels (List[str], optional): The labels/IDs of nodes.
(default: :obj:`None`)
"""
if edge_weight is not None: # Normalize edge weights.
edge_weight = edge_weight - edge_weight.min()
edge_weight = edge_weight / edge_weight.max()
if edge_weight is not None: # Discard any edges with zero edge weight:
mask = edge_weight > 1e-7
edge_index = edge_index[:, mask]
edge_weight = edge_weight[mask]
if edge_weight is None:
edge_weight = torch.ones(edge_index.size(1))
if backend is None:
backend = 'graphviz' if has_graphviz() else 'networkx'
if backend.lower() == 'networkx':
return _visualize_graph_via_networkx(edge_index, edge_weight, path,
node_labels)
elif backend.lower() == 'graphviz':
return _visualize_graph_via_graphviz(edge_index, edge_weight, path,
node_labels)
raise ValueError(f"Expected graph drawing backend to be in "
f"{BACKENDS} (got '{backend}')")
def _visualize_graph_via_graphviz(
edge_index: Tensor,
edge_weight: Tensor,
path: Optional[str] = None,
node_labels: Optional[List[str]] = None,
) -> Any:
import graphviz
suffix = path.split('.')[-1] if path is not None else None
g = graphviz.Digraph('graph', format=suffix)
g.attr('node', shape='circle', fontsize='11pt')
for node in edge_index.view(-1).unique().tolist():
g.node(str(node) if node_labels is None else node_labels[node])
for (src, dst), w in zip(edge_index.t().tolist(), edge_weight.tolist()):
hex_color = hex(255 - round(255 * w))[2:]
hex_color = f'{hex_color}0' if len(hex_color) == 1 else hex_color
if node_labels is not None:
src = node_labels[src]
dst = node_labels[dst]
g.edge(str(src), str(dst), color=f'#{hex_color}{hex_color}{hex_color}')
if path is not None:
path = '.'.join(path.split('.')[:-1])
g.render(path, cleanup=True)
else:
g.view()
return g
def _visualize_graph_via_networkx(
edge_index: Tensor,
edge_weight: Tensor,
path: Optional[str] = None,
node_labels: Optional[List[str]] = None,
) -> Any:
import matplotlib.pyplot as plt
import networkx as nx
g = nx.DiGraph()
node_size = 800
for node in edge_index.view(-1).unique().tolist():
g.add_node(node if node_labels is None else node_labels[node])
for (src, dst), w in zip(edge_index.t().tolist(), edge_weight.tolist()):
if node_labels is not None:
src = node_labels[src]
dst = node_labels[dst]
g.add_edge(src, dst, alpha=w)
ax = plt.gca()
pos = nx.spring_layout(g)
for src, dst, data in g.edges(data=True):
ax.annotate(
'',
xy=pos[src],
xytext=pos[dst],
arrowprops=dict(
arrowstyle="->",
alpha=data['alpha'],
shrinkA=sqrt(node_size) / 2.0,
shrinkB=sqrt(node_size) / 2.0,
connectionstyle="arc3,rad=0.1",
),
)
nodes = nx.draw_networkx_nodes(g, pos, node_size=node_size,
node_color='white', margins=0.1)
nodes.set_edgecolor('black')
nx.draw_networkx_labels(g, pos, font_size=10)
if path is not None:
plt.savefig(path)
else:
plt.show()
plt.close()
|