1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# mypy: allow-untyped-defs
from typing import Dict, List, Optional
import torch
import torch.optim._functional as F
from torch import Tensor
from torch.distributed.optim._deprecation_warning import (
_scripted_functional_optimizer_deprecation_warning,
)
__all__: List[str] = []
# Define a TorchScript compatible Functional RMSprop Optimizer
# where we use these optimizer in a functional way.
# Instead of using the `param.grad` when updating parameters,
# we explicitly allow the distributed optimizer pass gradients to
# the `step` function. In this way, we could separate the gradients
# and parameters and allow multithreaded trainer to update the
# parameters without data traces on accumulating to the same .grad.
# NOTE: This should be only used by distributed optimizer internals
# and not meant to expose to the user.
@torch.jit.script
class _FunctionalRMSprop:
def __init__(
self,
params: List[Tensor],
lr: float = 1e-2,
alpha: float = 0.99,
eps: float = 1e-8,
weight_decay: float = 0.0,
momentum: float = 0.0,
centered: bool = False,
foreach: bool = False,
maximize: bool = False,
_allow_empty_param_list: bool = False,
):
_scripted_functional_optimizer_deprecation_warning(stacklevel=2)
self.defaults = {
"lr": lr,
"alpha": alpha,
"eps": eps,
"weight_decay": weight_decay,
"momentum": momentum,
}
self.centered = centered
self.foreach = foreach
self.maximize = maximize
if len(params) == 0 and not _allow_empty_param_list:
raise ValueError("optimizer got an empty parameter list")
# NOTE: we only have one param_group and don't allow user to add additional
# param group as it's not a common use case.
self.param_group = {"params": params}
self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})
def step(self, gradients: List[Optional[Tensor]]):
params = self.param_group["params"]
params_with_grad = []
grads = []
square_avgs = []
grad_avgs = []
momentum_buffer_list = []
state_steps = []
lr = self.defaults["lr"]
alpha = self.defaults["alpha"]
eps = self.defaults["eps"]
momentum = self.defaults["momentum"]
weight_decay = self.defaults["weight_decay"]
if len(params) != len(gradients):
raise ValueError(
"the gradients passed in does not equal to the size of the parameters!"
+ f"Params length: {len(params)}. "
+ f"Gradients length: {len(gradients)}"
)
has_complex = False
for param, gradient in zip(params, gradients):
if gradient is not None:
has_complex |= torch.is_complex(param)
params_with_grad.append(param)
grads.append(gradient)
# Lazy state initialization
if param not in self.state:
self.state[param] = {}
state = self.state[param]
state["step"] = torch.tensor(0.0)
state["square_avg"] = torch.zeros_like(
param, memory_format=torch.preserve_format
)
if momentum > 0:
state["momentum_buffer"] = torch.zeros_like(
param, memory_format=torch.preserve_format
)
if self.centered:
state["grad_avg"] = torch.zeros_like(
param, memory_format=torch.preserve_format
)
state = self.state[param]
square_avgs.append(state["square_avg"])
if momentum > 0:
momentum_buffer_list.append(state["momentum_buffer"])
if self.centered:
grad_avgs.append(state["grad_avg"])
state_steps.append(state["step"])
with torch.no_grad():
F.rmsprop(
params_with_grad,
grads,
square_avgs,
grad_avgs,
momentum_buffer_list,
state_steps,
lr=lr,
alpha=alpha,
eps=eps,
weight_decay=weight_decay,
momentum=momentum,
centered=self.centered,
foreach=self.foreach,
maximize=self.maximize,
has_complex=has_complex,
)
|