File: pgm_explainer_graph_classification.py

package info (click to toggle)
pytorch-geometric 2.6.1-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,904 kB
  • sloc: python: 127,155; sh: 338; cpp: 27; makefile: 18; javascript: 16
file content (120 lines) | stat: -rw-r--r-- 4,044 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""This is an example of using the PGM explainer algorithm on a graph
classification task.
"""
import os.path as osp

import torch
import torch.nn.functional as F
from torch.nn import Linear, ReLU, Sequential

import torch_geometric.transforms as T
from torch_geometric.contrib.explain import PGMExplainer
from torch_geometric.datasets import MNISTSuperpixels
from torch_geometric.explain import Explainer
from torch_geometric.loader import DataLoader
from torch_geometric.nn import (
    NNConv,
    global_mean_pool,
    graclus,
    max_pool,
    max_pool_x,
)
from torch_geometric.utils import normalized_cut

path = osp.join(osp.dirname(osp.realpath(__file__)), '..', 'data', 'MNIST')
transform = T.Cartesian(cat=False)
train_dataset = MNISTSuperpixels(path, True, transform=transform)
test_dataset = MNISTSuperpixels(path, False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)
d = train_dataset


def normalized_cut_2d(edge_index, pos):
    row, col = edge_index
    edge_attr = torch.norm(pos[row] - pos[col], p=2, dim=1)
    return normalized_cut(edge_index, edge_attr, num_nodes=pos.size(0))


class Net(torch.nn.Module):
    def __init__(self):
        super().__init__()
        nn1 = Sequential(
            Linear(2, 25),
            ReLU(),
            Linear(25, d.num_features * 32),
        )
        self.conv1 = NNConv(d.num_features, 32, nn1, aggr='mean')

        nn2 = Sequential(
            Linear(2, 25),
            ReLU(),
            Linear(25, 32 * 64),
        )
        self.conv2 = NNConv(32, 64, nn2, aggr='mean')

        self.fc1 = torch.nn.Linear(64, 128)
        self.fc2 = torch.nn.Linear(128, d.num_classes)

    def forward(self, x, edge_index, **kwargs):
        data = kwargs.get('data')
        data = data.detach().clone()
        x = F.elu(self.conv1(x, edge_index, data.edge_attr))
        weight = normalized_cut_2d(edge_index, data.pos)
        cluster = graclus(edge_index, weight, x.size(0))
        data.edge_attr = None
        data.x = x
        data.edge_index = edge_index
        data = max_pool(cluster, data, transform=transform)

        data.x = F.elu(self.conv2(data.x, data.edge_index, data.edge_attr))
        weight = normalized_cut_2d(data.edge_index, data.pos)
        cluster = graclus(data.edge_index, weight, data.x.size(0))
        x, batch = max_pool_x(cluster, data.x, data.batch)

        x = global_mean_pool(x, batch)
        x = F.elu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        return F.log_softmax(self.fc2(x), dim=1)


def train(model, dataloader):
    model.train()

    for data in dataloader:
        data = data.to(device)
        optimizer.zero_grad()
        F.nll_loss(model(data.x, data), data.y).backward()
        optimizer.step()


if __name__ == "__main__":

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'current device: {device}')
    model = Net().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(2):
        train(model, train_loader)

    explainer = Explainer(
        model=model, algorithm=PGMExplainer(perturb_feature_list=[0],
                                            perturbation_mode="mean"),
        explanation_type='phenomenon', node_mask_type="object",
        model_config=dict(mode="multiclass_classification", task_level="graph",
                          return_type="raw"))
    i = 0

    for explain_dataset in test_loader:
        explain_dataset.to(device)
        explanation = explainer(x=explain_dataset.x,
                                edge_index=explain_dataset.edge_index,
                                target=explain_dataset.y,
                                edge_attr=explain_dataset.edge_attr,
                                data=explain_dataset)
        for k in explanation.available_explanations:
            print(explanation[k])
        i += 1
        if i > 2:
            break