1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
import subprocess
from time import sleep
import psutil
import pytest
import torch
from torch_geometric.data import Data
from torch_geometric.loader import NeighborLoader
from torch_geometric.testing import onlyLinux, onlyNeighborSampler
@onlyLinux
@onlyNeighborSampler
@pytest.mark.skipif(
psutil.cpu_count(logical=False) == 1, reason="Requires multiple CPU cores")
@pytest.mark.parametrize('loader_cores', [None, [1, 2]])
def test_cpu_affinity_neighbor_loader(loader_cores, spawn_context):
data = Data(x=torch.randn(1, 1))
loader = NeighborLoader(data, num_neighbors=[-1], batch_size=1,
num_workers=2)
out = []
with loader.enable_cpu_affinity(loader_cores):
iterator = loader._get_iterator()
workers = iterator._workers
sleep(3) # Gives time for worker to initialize.
for worker in workers:
process = subprocess.Popen(
['taskset', '-c', '-p', f'{worker.pid}'],
stdout=subprocess.PIPE)
stdout = process.communicate()[0].decode('utf-8')
# returns "pid <pid>'s current affinity list <n>-<m>"
out.append(stdout.split(':')[1].strip())
if loader_cores:
out == ['[1]', '[2]']
else:
out[0] != out[1]
def init_fn(worker_id):
assert torch.get_num_threads() == 2
@onlyLinux
@onlyNeighborSampler
@pytest.mark.skipif(
psutil.cpu_count(logical=False) == 1, reason="Requires multiple CPU cores")
def test_multithreading_neighbor_loader(spawn_context):
loader = NeighborLoader(
data=Data(x=torch.randn(1, 1)),
num_neighbors=[-1],
batch_size=1,
num_workers=2,
worker_init_fn=init_fn,
)
with loader.enable_multithreading(2):
loader._get_iterator() # Runs assertion in `init_fn`.
|