1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
# mypy: allow-untyped-defs
from typing import Optional
import torch
from torch.utils import _pytree as pytree
def _basic_validation(op, args=(), kwargs=None):
"""
Common validation across all ops go in here.
"""
from torch.distributed._shard.sharded_tensor import ShardedTensor
if len(args) == 0 and (kwargs is None or len(kwargs) == 0):
raise ValueError(f" No input for '{op.__name__}'!")
# Validate types
has_distributed_tensor = False
def is_distributed_tensor(e):
nonlocal has_distributed_tensor
if isinstance(e, ShardedTensor):
has_distributed_tensor = True
pytree.tree_map_(is_distributed_tensor, args)
pytree.tree_map_(is_distributed_tensor, kwargs)
if not has_distributed_tensor:
raise TypeError(
f"torch function '{op.__name__}', with args: {args} and "
f"kwargs: {kwargs} are called without any distributed tensor!"
)
# Validate all distributed tensors use the same PG.
cur_pg: Optional[torch.distributed.ProcessGroup] = None
def validate_pg(e):
nonlocal cur_pg
if isinstance(e, ShardedTensor):
if cur_pg is not None and e._process_group is not cur_pg:
raise RuntimeError(
"All distributed tensors should use the "
"same ProcessGroup if used together in an op."
)
cur_pg = e._process_group
pytree.tree_map_(validate_pg, args)
pytree.tree_map_(validate_pg, kwargs)
def _register_default_op(op, decorator):
@decorator(op)
def tensor_default_op(types, args=(), kwargs=None, pg=None):
"""
Handles ``__torch_function__`` dispatch for the default tensor ops that
behave the same as ``torch.Tensor`` such as ``torch.Tensor.shape`` or
``torch.Tensor.dtype``. We simply lower to the real op call with
DisableTorchFunctionSubclass context like ``torch.Tensor.__torch_function__``
to avoid recursions.
"""
if kwargs is None:
kwargs = {}
with torch._C.DisableTorchFunctionSubclass():
return op(*args, **kwargs)
|