File: willow_object_class.py

package info (click to toggle)
pytorch-geometric 2.6.1-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,904 kB
  • sloc: python: 127,155; sh: 338; cpp: 27; makefile: 18; javascript: 16
file content (189 lines) | stat: -rw-r--r-- 7,028 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import glob
import os
import os.path as osp
from typing import Callable, List, Optional

import torch
import torch.nn.functional as F
from torch import Tensor
from torch.utils.data import DataLoader

from torch_geometric.data import (
    Data,
    InMemoryDataset,
    download_url,
    extract_zip,
)
from torch_geometric.io import fs


class WILLOWObjectClass(InMemoryDataset):
    r"""The WILLOW-ObjectClass dataset from the `"Learning Graphs to Match"
    <https://www.di.ens.fr/willow/pdfscurrent/cho2013.pdf>`_ paper,
    containing 10 equal keypoints of at least 40 images in each category.
    The keypoints contain interpolated features from a pre-trained VGG16 model
    on ImageNet (:obj:`relu4_2` and :obj:`relu5_1`).

    Args:
        root (str): Root directory where the dataset should be saved.
        category (str): The category of the images (one of :obj:`"Car"`,
            :obj:`"Duck"`, :obj:`"Face"`, :obj:`"Motorbike"`,
            :obj:`"Winebottle"`).
        transform (callable, optional): A function/transform that takes in an
            :obj:`torch_geometric.data.Data` object and returns a transformed
            version. The data object will be transformed before every access.
            (default: :obj:`None`)
        pre_transform (callable, optional): A function/transform that takes in
            an :obj:`torch_geometric.data.Data` object and returns a
            transformed version. The data object will be transformed before
            being saved to disk. (default: :obj:`None`)
        pre_filter (callable, optional): A function that takes in an
            :obj:`torch_geometric.data.Data` object and returns a boolean
            value, indicating whether the data object should be included in the
            final dataset. (default: :obj:`None`)
        force_reload (bool, optional): Whether to re-process the dataset.
            (default: :obj:`False`)
        device (str or torch.device, optional): The device to use for
            processing the raw data. If set to :obj:`None`, will utilize
            GPU-processing if available. (default: :obj:`None`)
    """
    url = ('http://www.di.ens.fr/willow/research/graphlearning/'
           'WILLOW-ObjectClass_dataset.zip')

    categories = ['face', 'motorbike', 'car', 'duck', 'winebottle']

    batch_size = 32

    def __init__(
        self,
        root: str,
        category: str,
        transform: Optional[Callable] = None,
        pre_transform: Optional[Callable] = None,
        pre_filter: Optional[Callable] = None,
        force_reload: bool = False,
        device: Optional[str] = None,
    ) -> None:
        if device is None:
            device = 'cuda' if torch.cuda.is_available() else 'cpu'

        assert category.lower() in self.categories
        self.category = category
        self.device = device
        super().__init__(root, transform, pre_transform, pre_filter,
                         force_reload=force_reload)
        self.load(self.processed_paths[0])

    @property
    def raw_dir(self) -> str:
        return osp.join(self.root, 'raw')

    @property
    def processed_dir(self) -> str:
        return osp.join(self.root, self.category.capitalize(), 'processed')

    @property
    def raw_file_names(self) -> List[str]:
        return [category.capitalize() for category in self.categories]

    @property
    def processed_file_names(self) -> str:
        return 'data.pt'

    def download(self) -> None:
        path = download_url(self.url, self.root)
        extract_zip(path, self.root)
        os.unlink(path)
        os.unlink(osp.join(self.root, 'README'))
        os.unlink(osp.join(self.root, 'demo_showAnno.m'))
        fs.rm(self.raw_dir)
        os.rename(osp.join(self.root, 'WILLOW-ObjectClass'), self.raw_dir)

    def process(self) -> None:
        import torchvision.models as models
        import torchvision.transforms as T
        from PIL import Image
        from scipy.io import loadmat

        category = self.category.capitalize()
        names = glob.glob(osp.join(self.raw_dir, category, '*.png'))
        names = sorted([name[:-4] for name in names])

        vgg16_outputs = []

        def hook(module: torch.nn.Module, x: Tensor, y: Tensor) -> None:
            vgg16_outputs.append(y.to('cpu'))

        vgg16 = models.vgg16(pretrained=True).to(self.device)
        vgg16.eval()
        vgg16.features[20].register_forward_hook(hook)  # relu4_2
        vgg16.features[25].register_forward_hook(hook)  # relu5_1

        transform = T.Compose([
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        data_list = []
        for name in names:
            pos = loadmat(f'{name}.mat')['pts_coord']
            x, y = torch.from_numpy(pos).to(torch.float)
            pos = torch.stack([x, y], dim=1)

            # The "face" category contains a single image with less than 10
            # keypoints, so we need to skip it.
            if pos.size(0) != 10:
                continue

            with open(f'{name}.png', 'rb') as f:
                img = Image.open(f).convert('RGB')

            # Rescale keypoints.
            pos[:, 0] = pos[:, 0] * 256.0 / (img.size[0])
            pos[:, 1] = pos[:, 1] * 256.0 / (img.size[1])

            img = img.resize((256, 256), resample=Image.Resampling.BICUBIC)
            img = transform(img)

            data = Data(img=img, pos=pos, name=name)
            data_list.append(data)

        imgs = [data.img for data in data_list]
        loader = DataLoader(
            dataset=imgs,  # type: ignore
            batch_size=self.batch_size,
            shuffle=False,
        )
        for i, batch_img in enumerate(loader):
            vgg16_outputs.clear()

            with torch.no_grad():
                vgg16(batch_img.to(self.device))

            out1 = F.interpolate(vgg16_outputs[0], (256, 256), mode='bilinear',
                                 align_corners=False)
            out2 = F.interpolate(vgg16_outputs[1], (256, 256), mode='bilinear',
                                 align_corners=False)

            for j in range(out1.size(0)):
                data = data_list[i * self.batch_size + j]
                assert data.pos is not None
                idx = data.pos.round().long().clamp(0, 255)
                x_1 = out1[j, :, idx[:, 1], idx[:, 0]].to('cpu')
                x_2 = out2[j, :, idx[:, 1], idx[:, 0]].to('cpu')
                data.img = None
                data.x = torch.cat([x_1.t(), x_2.t()], dim=-1)
            del out1
            del out2

        if self.pre_filter is not None:
            data_list = [data for data in data_list if self.pre_filter(data)]

        if self.pre_transform is not None:
            data_list = [self.pre_transform(data) for data in data_list]

        self.save(data_list, self.processed_paths[0])

    def __repr__(self) -> str:
        return (f'{self.__class__.__name__}({len(self)}, '
                f'category={self.category})')