1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
#include <torch/csrc/distributed/c10d/ProcessGroupRoundRobin.hpp>
namespace c10d {
ProcessGroupRoundRobin::ProcessGroupRoundRobin(
int rank,
int size,
std::vector<c10::intrusive_ptr<ProcessGroup>> processGroups)
: ProcessGroup(rank, size), processGroups_(std::move(processGroups)) {
TORCH_WARN(
"ProcessGroupRoundRobin is deprecated and scheduled to be removed after this current release (1.13). ",
"Please file an issue on https://github.com/pytorch/pytorch/issues if there are any concerns or issues with this deprecation.");
TORCH_CHECK(processGroups_.size() >= 1);
for (const auto& processGroup : processGroups_) {
TORCH_CHECK(processGroup->getRank() == rank_);
TORCH_CHECK(processGroup->getSize() == size_);
}
iterator_ = processGroups_.begin();
}
ProcessGroupRoundRobin::~ProcessGroupRoundRobin() {}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::broadcast(
std::vector<at::Tensor>& tensors,
const BroadcastOptions& opts) {
return next()->broadcast(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
return next()->allreduce(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts) {
return next()->allreduce_coalesced(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts) {
return next()->reduce(tensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::allgather(
std::vector<std::vector<at::Tensor>>& outputs,
std::vector<at::Tensor>& inputs,
const AllgatherOptions& opts) {
return next()->allgather(outputs, inputs, opts);
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::allgather_coalesced(
std::vector<std::vector<at::Tensor>>& outputTensorLists,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts) {
return next()->allgather(outputTensorLists, inputTensors, opts);
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::gather(
std::vector<std::vector<at::Tensor>>& outputs,
std::vector<at::Tensor>& inputs,
const GatherOptions& opts) {
return next()->gather(outputs, inputs, opts);
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::scatter(
std::vector<at::Tensor>& outputs,
std::vector<std::vector<at::Tensor>>& inputs,
const ScatterOptions& opts) {
return next()->scatter(outputs, inputs, opts);
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::reduce_scatter(
std::vector<at::Tensor>& outputs,
std::vector<std::vector<at::Tensor>>& inputs,
const ReduceScatterOptions& opts) {
return next()->reduce_scatter(outputs, inputs, opts);
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& opts) {
return next()->alltoall_base(
outputTensor, inputTensor, outputSplitSizes, inputSplitSizes, opts);
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::send(
std::vector<at::Tensor>& /* unused */,
int /* unused */,
int /* unused */) {
TORCH_CHECK(false, "ProcessGroupRoundRobin does not support send");
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::recv(
std::vector<at::Tensor>& /* unused */,
int /* unused */,
int /* unused */) {
TORCH_CHECK(false, "ProcessGroupRoundRobin does not support recv");
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::recvAnysource(
std::vector<at::Tensor>& /* unused */,
int /* unused */) {
TORCH_CHECK(false, "ProcessGroupRoundRobin does not support recv");
};
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::barrier(
const BarrierOptions& /* unused */) {
TORCH_CHECK(false, "ProcessGroupRoundRobin does not support barrier");
};
const c10::intrusive_ptr<ProcessGroup>& ProcessGroupRoundRobin::next() {
auto& processGroup = *iterator_;
iterator_++;
if (iterator_ == processGroups_.end()) {
iterator_ = processGroups_.begin();
}
return processGroup;
}
c10::intrusive_ptr<Work> ProcessGroupRoundRobin::_allgather_base(
at::Tensor& /*unused */,
at::Tensor& /*unused */,
const AllgatherOptions& /*unused */) {
TORCH_CHECK(
false, "no support for _allgather_base in RoundRobin process group");
}
} // namespace c10d
|