1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
import torch
from torch_geometric.data import Data
from torch_geometric.loader import ShaDowKHopSampler
from torch_geometric.testing import withPackage
from torch_geometric.typing import SparseTensor
@withPackage('torch_sparse')
def test_shadow_k_hop_sampler():
row = torch.tensor([0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 4, 4, 5, 5])
col = torch.tensor([1, 2, 3, 0, 2, 0, 1, 4, 5, 0, 2, 5, 2, 4])
edge_index = torch.stack([row, col], dim=0)
edge_weight = torch.arange(row.size(0))
x = torch.randn(6, 16)
y = torch.randint(3, (6, ), dtype=torch.long)
data = Data(edge_index=edge_index, edge_weight=edge_weight, x=x, y=y)
train_mask = torch.tensor([1, 1, 0, 0, 0, 0], dtype=torch.bool)
loader = ShaDowKHopSampler(data, depth=1, num_neighbors=3,
node_idx=train_mask, batch_size=2)
assert len(loader) == 1
batch1 = next(iter(loader))
assert batch1.num_graphs == len(batch1) == 2
assert batch1.batch.tolist() == [0, 0, 0, 0, 1, 1, 1]
assert batch1.ptr.tolist() == [0, 4, 7]
assert batch1.root_n_id.tolist() == [0, 5]
assert batch1.x.tolist() == x[torch.tensor([0, 1, 2, 3, 0, 1, 2])].tolist()
assert batch1.y.tolist() == y[train_mask].tolist()
row, col = batch1.edge_index
assert row.tolist() == [0, 0, 0, 1, 1, 2, 2, 3, 4, 4, 5, 5, 6, 6]
assert col.tolist() == [1, 2, 3, 0, 2, 0, 1, 0, 5, 6, 4, 6, 4, 5]
e_id = torch.tensor([0, 1, 2, 3, 4, 5, 6, 9, 0, 1, 3, 4, 5, 6])
assert batch1.edge_weight.tolist() == edge_weight[e_id].tolist()
adj_t = SparseTensor(row=edge_index[0], col=edge_index[1],
value=edge_weight).t()
data = Data(adj_t=adj_t, x=x, y=y)
loader = ShaDowKHopSampler(data, depth=1, num_neighbors=3,
node_idx=train_mask, batch_size=2)
assert len(loader) == 1
batch2 = next(iter(loader))
assert batch2.num_graphs == len(batch2) == 2
assert batch1.batch.tolist() == batch2.batch.tolist()
assert batch1.ptr.tolist() == batch2.ptr.tolist()
assert batch1.root_n_id.tolist() == batch2.root_n_id.tolist()
assert batch1.x.tolist() == batch2.x.tolist()
assert batch1.y.tolist() == batch2.y.tolist()
row, col, value = batch2.adj_t.t().coo()
assert batch1.edge_index[0].tolist() == row.tolist()
assert batch1.edge_index[1].tolist() == col.tolist()
assert batch1.edge_weight.tolist() == value.tolist()
|