File: test_rw.py

package info (click to toggle)
pytorch-cluster 1.6.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 648 kB
  • sloc: cpp: 2,076; python: 1,081; sh: 53; makefile: 8
file content (87 lines) | stat: -rw-r--r-- 2,592 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import pytest
import torch
from torch_cluster import random_walk
from torch_cluster.testing import devices, tensor


@pytest.mark.parametrize('device', devices)
def test_rw_large(device):
    row = tensor([0, 1, 1, 1, 2, 2, 3, 3, 4, 4], torch.long, device)
    col = tensor([1, 0, 2, 3, 1, 4, 1, 4, 2, 3], torch.long, device)
    start = tensor([0, 1, 2, 3, 4], torch.long, device)
    walk_length = 10

    out = random_walk(row, col, start, walk_length)
    assert out[:, 0].tolist() == start.tolist()

    for n in range(start.size(0)):
        cur = start[n].item()
        for i in range(1, walk_length):
            assert out[n, i].item() in col[row == cur].tolist()
            cur = out[n, i].item()


@pytest.mark.parametrize('device', devices)
def test_rw_small(device):
    row = tensor([0, 1], torch.long, device)
    col = tensor([1, 0], torch.long, device)
    start = tensor([0, 1, 2], torch.long, device)
    walk_length = 4

    out = random_walk(row, col, start, walk_length, num_nodes=3)
    assert out.tolist() == [[0, 1, 0, 1, 0], [1, 0, 1, 0, 1], [2, 2, 2, 2, 2]]

    jit = torch.jit.script(random_walk)
    assert torch.equal(jit(row, col, start, walk_length, num_nodes=3), out)


@pytest.mark.parametrize('device', devices)
def test_rw_large_with_edge_indices(device):
    row = tensor([0, 1, 1, 1, 2, 2, 3, 3, 4, 4], torch.long, device)
    col = tensor([1, 0, 2, 3, 1, 4, 1, 4, 2, 3], torch.long, device)
    start = tensor([0, 1, 2, 3, 4], torch.long, device)
    walk_length = 10

    node_seq, edge_seq = random_walk(
        row,
        col,
        start,
        walk_length,
        return_edge_indices=True,
    )
    assert node_seq[:, 0].tolist() == start.tolist()

    for n in range(start.size(0)):
        cur = start[n].item()
        for i in range(1, walk_length):
            assert node_seq[n, i].item() in col[row == cur].tolist()
            cur = node_seq[n, i].item()

    assert (edge_seq != -1).all()


@pytest.mark.parametrize('device', devices)
def test_rw_small_with_edge_indices(device):
    row = tensor([0, 1], torch.long, device)
    col = tensor([1, 0], torch.long, device)
    start = tensor([0, 1, 2], torch.long, device)
    walk_length = 4

    node_seq, edge_seq = random_walk(
        row,
        col,
        start,
        walk_length,
        num_nodes=3,
        return_edge_indices=True,
    )
    assert node_seq.tolist() == [
        [0, 1, 0, 1, 0],
        [1, 0, 1, 0, 1],
        [2, 2, 2, 2, 2],
    ]
    assert edge_seq.tolist() == [
        [0, 1, 0, 1],
        [1, 0, 1, 0],
        [-1, -1, -1, -1],
    ]