1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
from typing import Any, Optional
import numpy as np
import torch
from torch import Tensor
import torch_geometric.typing
from torch_geometric.data import Data
from torch_geometric.data.datapipes import functional_transform
from torch_geometric.transforms import BaseTransform
from torch_geometric.utils import (
get_laplacian,
get_self_loop_attr,
is_torch_sparse_tensor,
scatter,
to_edge_index,
to_scipy_sparse_matrix,
to_torch_coo_tensor,
to_torch_csr_tensor,
)
def add_node_attr(
data: Data,
value: Any,
attr_name: Optional[str] = None,
) -> Data:
# TODO Move to `BaseTransform`.
if attr_name is None:
if data.x is not None:
x = data.x.view(-1, 1) if data.x.dim() == 1 else data.x
data.x = torch.cat([x, value.to(x.device, x.dtype)], dim=-1)
else:
data.x = value
else:
data[attr_name] = value
return data
@functional_transform('add_laplacian_eigenvector_pe')
class AddLaplacianEigenvectorPE(BaseTransform):
r"""Adds the Laplacian eigenvector positional encoding from the
`"Benchmarking Graph Neural Networks" <https://arxiv.org/abs/2003.00982>`_
paper to the given graph
(functional name: :obj:`add_laplacian_eigenvector_pe`).
Args:
k (int): The number of non-trivial eigenvectors to consider.
attr_name (str, optional): The attribute name of the data object to add
positional encodings to. If set to :obj:`None`, will be
concatenated to :obj:`data.x`.
(default: :obj:`"laplacian_eigenvector_pe"`)
is_undirected (bool, optional): If set to :obj:`True`, this transform
expects undirected graphs as input, and can hence speed up the
computation of eigenvectors. (default: :obj:`False`)
**kwargs (optional): Additional arguments of
:meth:`scipy.sparse.linalg.eigs` (when :attr:`is_undirected` is
:obj:`False`) or :meth:`scipy.sparse.linalg.eigsh` (when
:attr:`is_undirected` is :obj:`True`).
"""
# Number of nodes from which to use sparse eigenvector computation:
SPARSE_THRESHOLD: int = 100
def __init__(
self,
k: int,
attr_name: Optional[str] = 'laplacian_eigenvector_pe',
is_undirected: bool = False,
**kwargs: Any,
) -> None:
self.k = k
self.attr_name = attr_name
self.is_undirected = is_undirected
self.kwargs = kwargs
def forward(self, data: Data) -> Data:
assert data.edge_index is not None
num_nodes = data.num_nodes
assert num_nodes is not None
edge_index, edge_weight = get_laplacian(
data.edge_index,
data.edge_weight,
normalization='sym',
num_nodes=num_nodes,
)
L = to_scipy_sparse_matrix(edge_index, edge_weight, num_nodes)
if num_nodes < self.SPARSE_THRESHOLD:
from numpy.linalg import eig, eigh
eig_fn = eig if not self.is_undirected else eigh
eig_vals, eig_vecs = eig_fn(L.todense()) # type: ignore
else:
from scipy.sparse.linalg import eigs, eigsh
eig_fn = eigs if not self.is_undirected else eigsh
eig_vals, eig_vecs = eig_fn( # type: ignore
L,
k=self.k + 1,
which='SR' if not self.is_undirected else 'SA',
return_eigenvectors=True,
**self.kwargs,
)
eig_vecs = np.real(eig_vecs[:, eig_vals.argsort()])
pe = torch.from_numpy(eig_vecs[:, 1:self.k + 1])
sign = -1 + 2 * torch.randint(0, 2, (self.k, ))
pe *= sign
data = add_node_attr(data, pe, attr_name=self.attr_name)
return data
@functional_transform('add_random_walk_pe')
class AddRandomWalkPE(BaseTransform):
r"""Adds the random walk positional encoding from the `"Graph Neural
Networks with Learnable Structural and Positional Representations"
<https://arxiv.org/abs/2110.07875>`_ paper to the given graph
(functional name: :obj:`add_random_walk_pe`).
Args:
walk_length (int): The number of random walk steps.
attr_name (str, optional): The attribute name of the data object to add
positional encodings to. If set to :obj:`None`, will be
concatenated to :obj:`data.x`.
(default: :obj:`"random_walk_pe"`)
"""
def __init__(
self,
walk_length: int,
attr_name: Optional[str] = 'random_walk_pe',
) -> None:
self.walk_length = walk_length
self.attr_name = attr_name
def forward(self, data: Data) -> Data:
assert data.edge_index is not None
row, col = data.edge_index
N = data.num_nodes
assert N is not None
if data.edge_weight is None:
value = torch.ones(data.num_edges, device=row.device)
else:
value = data.edge_weight
value = scatter(value, row, dim_size=N, reduce='sum').clamp(min=1)[row]
value = 1.0 / value
if N <= 2_000: # Dense code path for faster computation:
adj = torch.zeros((N, N), device=row.device)
adj[row, col] = value
loop_index = torch.arange(N, device=row.device)
elif torch_geometric.typing.NO_MKL: # pragma: no cover
adj = to_torch_coo_tensor(data.edge_index, value, size=data.size())
else:
adj = to_torch_csr_tensor(data.edge_index, value, size=data.size())
def get_pe(out: Tensor) -> Tensor:
if is_torch_sparse_tensor(out):
return get_self_loop_attr(*to_edge_index(out), num_nodes=N)
return out[loop_index, loop_index]
out = adj
pe_list = [get_pe(out)]
for _ in range(self.walk_length - 1):
out = out @ adj
pe_list.append(get_pe(out))
pe = torch.stack(pe_list, dim=-1)
data = add_node_attr(data, pe, attr_name=self.attr_name)
return data
|