File: launcher.py

package info (click to toggle)
pytorch-ignite 0.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 11,712 kB
  • sloc: python: 46,874; sh: 376; makefile: 27
file content (346 lines) | stat: -rw-r--r-- 13,492 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from typing import Any, Callable, Dict, Optional

from ignite.distributed import utils as idist
from ignite.utils import setup_logger

__all__ = [
    "Parallel",
]


class Parallel:
    """Distributed launcher context manager to simplify distributed configuration setup for multiple backends:

    - backends from native torch distributed configuration: "nccl", "gloo" and "mpi" (if available)

    - XLA on TPUs via `pytorch/xla <https://github.com/pytorch/xla>`_ (if installed)

    - using `Horovod distributed framework <https://horovod.readthedocs.io>`_ (if installed)

    Namely, it can:

    1) Spawn ``nproc_per_node`` child processes and initialize a processing group according to
    provided ``backend`` (useful for standalone scripts).

    2) Only initialize a processing group given the ``backend``
    (useful with tools like `torchrun`_, `horovodrun`_, etc).

    Args:
        backend: backend to use: `nccl`, `gloo`, `xla-tpu`, `horovod`. If None, no distributed
            configuration.
        nproc_per_node: optional argument, number of processes per
            node to specify. If not None, :meth:`~ignite.distributed.launcher.Parallel.run`
            will spawn ``nproc_per_node`` processes that run input function with its arguments.
        nnodes: optional argument, number of nodes participating in distributed configuration.
            If not None, :meth:`~ignite.distributed.launcher.Parallel.run` will spawn ``nproc_per_node``
            processes that run input function with its arguments. Total world size is `nproc_per_node * nnodes`.
            This option is only supported by native torch distributed module. For other modules, please setup
            ``spawn_kwargs`` with backend specific arguments.
        node_rank: optional argument, current machine index. Mandatory argument if ``nnodes`` is
            specified and larger than one.
            This option is only supported by native torch distributed module. For other modules, please setup
            ``spawn_kwargs`` with backend specific arguments.
        master_addr: optional argument, master node TCP/IP address for torch native backends
            (`nccl`, `gloo`). Mandatory argument if ``nnodes`` is specified and larger than one.
        master_port: optional argument, master node port for torch native backends
            (`nccl`, `gloo`). Mandatory argument if ``master_addr`` is specified.
        init_method: optional argument to specify processing group initialization method for torch native
            backends (`nccl`, `gloo`). Default, "env://".
            See more info: `dist.init_process_group`_.
        spawn_kwargs: kwargs to ``idist.spawn`` function.

    Examples:
        1) Single node or Multi-node, Multi-GPU training launched with `torchrun` or `horovodrun`_
        tools

        Single node option with 4 GPUs

        .. code-block:: bash

            torchrun --nproc_per_node=4 main.py
            # or if installed horovod
            horovodrun -np=4 python main.py

        Multi-node option : 2 nodes with 8 GPUs each

        .. code-block:: bash

            ## node 0
            torchrun --nnodes=2 --node_rank=0 --master_addr=master --master_port=3344 \
            --nproc_per_node=8 main.py

            # or if installed horovod
            horovodrun -np 16 -H hostname1:8,hostname2:8 python main.py

            ## node 1
            torchrun --nnodes=2 --node_rank=1 --master_addr=master --master_port=3344 \
            --nproc_per_node=8 main.py


        User code is the same for both options:

        .. code-block:: python

            # main.py

            import ignite.distributed as idist

            def training(local_rank, config, **kwargs):
                # ...
                print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
                # ...

            backend = "nccl"  # or "horovod" if package is installed

            config = {"key": "value"}

            with idist.Parallel(backend=backend) as parallel:
                parallel.run(training, config, a=1, b=2)


        2) Single node, Multi-GPU training launched with `python`

        .. code-block:: bash

            python main.py

        .. code-block:: python

            # main.py

            import ignite.distributed as idist

            def training(local_rank, config, **kwargs):
                # ...
                print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
                # ...

            backend = "nccl"  # or "horovod" if package is installed

            # no "init_method" was specified , "env://" will be used
            with idist.Parallel(backend=backend, nproc_per_node=4) as parallel:
                parallel.run(training, config, a=1, b=2)


        Initializing the process using ``file://``

        .. code-block:: python

            with idist.Parallel(backend=backend, init_method='file:///d:/tmp/some_file', nproc_per_node=4) as parallel:
                parallel.run(training, config, a=1, b=2)

        Initializing the process using ``tcp://``

        .. code-block:: python

            with idist.Parallel(backend=backend, init_method='tcp://10.1.1.20:23456', nproc_per_node=4) as parallel:
                parallel.run(training, config, a=1, b=2)


        3) Single node, Multi-TPU training launched with `python`

        .. code-block:: bash

            python main.py

        .. code-block:: python

            # main.py

            import ignite.distributed as idist

            def training(local_rank, config, **kwargs):
                # ...
                print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
                # ...

            config = {"key": "value"}

            with idist.Parallel(backend="xla-tpu", nproc_per_node=8) as parallel:
                parallel.run(training, config, a=1, b=2)


        4) Multi-node, Multi-GPU training launched with `python`. For example, 2 nodes with 8 GPUs:

        Using torch native distributed framework:

        .. code-block:: bash

            # node 0
            python main.py --node_rank=0

            # node 1
            python main.py --node_rank=1


        .. code-block:: python

            # main.py

            import ignite.distributed as idist

            def training(local_rank, config, **kwargs):
                # ...
                print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
                # ...

            dist_config = {
                "nproc_per_node": 8,
                "nnodes": 2,
                "node_rank": args.node_rank,
                "master_addr": "master",
                "master_port": 15000
            }

            config = {"key": "value"}

            with idist.Parallel(backend="nccl", **dist_config) as parallel:
                parallel.run(training, config, a=1, b=2)


    .. _torchrun: https://pytorch.org/docs/stable/elastic/run.html#launcher-api
    .. _horovodrun: https://horovod.readthedocs.io/en/latest/api.html#module-horovod.run
    .. _dist.init_process_group: https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group
    .. versionchanged:: 0.4.2
        ``backend`` now accepts `horovod` distributed framework.

    .. versionchanged:: 0.4.5
        ``init_method`` added.

    """

    def __init__(
        self,
        backend: Optional[str] = None,
        nproc_per_node: Optional[int] = None,
        nnodes: Optional[int] = None,
        node_rank: Optional[int] = None,
        master_addr: Optional[str] = None,
        master_port: Optional[int] = None,
        init_method: Optional[str] = None,
        **spawn_kwargs: Any,
    ) -> None:
        if backend is not None:
            if backend not in idist.available_backends():
                raise ValueError(f"Unknown backend '{backend}'. Available backends: {idist.available_backends()}")
        else:
            arg_names = ["nproc_per_node", "nnodes", "node_rank", "master_addr", "master_port"]
            arg_values = [nproc_per_node, nnodes, node_rank, master_addr, master_port]
            for name, value in zip(arg_names, arg_values):
                if value is not None:
                    raise ValueError(f"If backend is None, argument '{name}' should be also None, but given {value}")

        self.backend = backend
        self._spawn_params = None
        self.init_method = init_method

        if self.backend is not None:
            if nproc_per_node is not None:
                self._spawn_params = self._setup_spawn_params(
                    nproc_per_node, nnodes, node_rank, master_addr, master_port, init_method, **spawn_kwargs
                )

        # The logger will be setup after the idist.initialize() call
        self._logger = None

    @staticmethod
    def _setup_spawn_params(
        nproc_per_node: int,
        nnodes: Optional[int] = None,
        node_rank: Optional[int] = None,
        master_addr: Optional[str] = None,
        master_port: Optional[int] = None,
        init_method: Optional[str] = None,
        **spawn_kwargs: Any,
    ) -> Dict:
        if nproc_per_node < 1:
            raise ValueError(f"Argument nproc_per_node should positive, but given {nproc_per_node}")
        if nnodes is None:
            nnodes = 1
        if nnodes < 1:
            raise ValueError(f"Argument nnodes should positive, but given {nnodes}")
        if node_rank is None:
            if nnodes > 1:
                raise ValueError("If number of nodes larger than one, arguments node_rank should be given")
            node_rank = 0
        if node_rank >= nnodes or node_rank < 0:
            raise ValueError(f"Argument node_rank should be between 0 and {nnodes - 1}, but given {node_rank}")
        if nnodes > 1 and (master_addr is None or master_port is None) and init_method is None:
            raise ValueError(
                "If number of nodes larger than one, arguments master_addr and master_port or init_method "
                f"should be specified, but given master_addr={master_addr}, master_port={master_port} and "
                f"init_method={init_method}."
            )
        params = {
            "nproc_per_node": nproc_per_node,
            "nnodes": nnodes,
            "node_rank": node_rank,
            "master_addr": master_addr,
            "master_port": master_port,
            "init_method": init_method,
        }
        params.update(spawn_kwargs)
        return {k: v for k, v in params.items() if v is not None}

    def run(self, func: Callable, *args: Any, **kwargs: Any) -> None:
        """Execute ``func`` with provided arguments in distributed context.

        Args:
            func: function to execute. First argument of the function should be `local_rank` - local process
                index.
            args: positional arguments of ``func`` (without `local_rank`).
            kwargs: keyword arguments of ``func``.

        Examples:
            .. code-block:: python

                def training(local_rank, config, **kwargs):
                    # ...
                    print(idist.get_rank(), ": run with config:", config, "- backend=", idist.backend())
                    # ...

                config = {"key": "value"}

                with idist.Parallel(backend=backend) as parallel:
                    parallel.run(training, config, a=1, b=2)

        """
        if self._spawn_params is not None and self.backend is not None:
            self._logger.info(  # type: ignore[attr-defined]
                f"Spawn function '{func}' in {self._spawn_params['nproc_per_node']} processes"
            )
            idist.spawn(self.backend, func, args=args, kwargs_dict=kwargs, **self._spawn_params)
        else:
            self._logger.info(f"- Run '{func}' in {idist.get_world_size()} processes")  # type: ignore[attr-defined]
            local_rank = idist.get_local_rank()
            func(local_rank, *args, **kwargs)

        self._logger.info("End of run")  # type: ignore[attr-defined]

    def __enter__(self) -> "Parallel":
        if self.backend is not None and self._spawn_params is None:
            idist.initialize(self.backend, init_method=self.init_method)

        # The logger can be setup from now since idist.initialize() has been called (if needed)
        self._logger = setup_logger(__name__ + "." + self.__class__.__name__)  # type: ignore[assignment]

        if self.backend is not None:
            if self._spawn_params is None:
                self._logger.info(  # type: ignore[attr-defined]
                    f"Initialized processing group with backend: '{self.backend}'"
                )
            else:
                self._logger.info(  # type: ignore[attr-defined]
                    f"Initialized distributed launcher with backend: '{self.backend}'"
                )
                msg = "\n\t".join([f"{k}: {v}" for k, v in self._spawn_params.items() if v is not None])
                self._logger.info(f"- Parameters to spawn processes: \n\t{msg}")  # type: ignore[attr-defined]

        return self

    def __exit__(self, *args: Any, **kwargs: Any) -> None:
        if (self.backend is not None) and self._spawn_params is None:
            self._logger.info(  # type: ignore[attr-defined]
                f"Finalized processing group with backend: '{self.backend}'"
            )
            idist.finalize()