File: static_rendezvous_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (102 lines) | stat: -rw-r--r-- 3,156 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# Owner(s): ["oncall: r2p"]

# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import unittest
from contextlib import closing

from torch.distributed.elastic.rendezvous import RendezvousParameters
from torch.distributed.elastic.rendezvous.static_tcp_rendezvous import (
    create_rdzv_handler,
)
from torch.distributed.elastic.utils import get_socket_with_port


class StaticTCPRendezvousTest(unittest.TestCase):
    def test_missing_port(self):
        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint="localhost",
            run_id="test_id",
            min_nodes=1,
            max_nodes=1,
        )
        with self.assertRaises(ValueError):
            create_rdzv_handler(rdzv_params)

    def test_empty_endpoint(self):
        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint="",
            run_id="test_id",
            min_nodes=1,
            max_nodes=1,
        )
        with self.assertRaises(ValueError):
            create_rdzv_handler(rdzv_params)

    def test_ipv6_addr(self):
        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint="[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:90",
            run_id="test_id",
            min_nodes=1,
            max_nodes=1,
        )
        with self.assertRaises(ValueError):
            create_rdzv_handler(rdzv_params)

    def test_ipv6_addr_localhost(self):
        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint="[::1]:90",
            run_id="test_id",
            min_nodes=1,
            max_nodes=1,
        )
        with self.assertRaises(ValueError):
            create_rdzv_handler(rdzv_params)

    def test_get_backend(self):
        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint="localhost:123",
            run_id="test",
            min_nodes=1,
            max_nodes=1,
            timeout=60,
            rank=0,
        )

        static_rdzv = create_rdzv_handler(rdzv_params)
        self.assertEqual("static", static_rdzv.get_backend())

    def test_static_rdzv_multiple_calls(self):
        sock = get_socket_with_port()
        with closing(sock):
            master_port = sock.getsockname()[1]
        master_addr = "localhost"

        rdzv_params = RendezvousParameters(
            backend="static",
            endpoint=f"{master_addr}:{master_port}",
            run_id="test_id",
            min_nodes=1,
            max_nodes=1,
            rank=0,
        )
        rdzv_handler = create_rdzv_handler(rdzv_params)

        # Call rendezvous two times
        store, rank, world_size = rdzv_handler.next_rendezvous()
        self.assertIsNotNone(store)
        self.assertEqual(0, rank)
        self.assertEqual(1, world_size)

        store, rank, world_size = rdzv_handler.next_rendezvous()
        self.assertIsNotNone(store)
        self.assertEqual(0, rank)
        self.assertEqual(1, world_size)