1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
|
#pragma once
#ifdef USE_C10D_UCC
#include <torch/csrc/distributed/c10d/UCCUtils.hpp>
#include <exception>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/Types.hpp>
#include <torch/csrc/distributed/c10d/Utils.hpp>
#ifdef USE_CUDA
#include <ATen/cuda/CUDAEvent.h>
#include <c10/cuda/CUDAStream.h>
#endif
namespace c10d {
#define TORCH_UCC_DEVICE_NOT_SET -2
#ifdef USE_CUDA
#define SAVE_TENSORS(_TENSORS, _DATA) \
do { \
if ((_TENSORS)[0].device().is_cuda()) { \
for (const auto i : c10::irange((_TENSORS).size())) { \
c10::cuda::CUDACachingAllocator::recordStream( \
(_TENSORS)[i].storage().data_ptr(), (*stream)); \
} \
} else { \
(_DATA) = (_TENSORS); \
} \
} while (0)
#else
#define SAVE_TENSORS(_TENSORS, _DATA) (_DATA) = (_TENSORS);
#endif
constexpr const char* UCC_BACKEND_NAME = "ucc";
struct event_pool_t {
#ifdef USE_CUDA
std::queue<std::unique_ptr<at::cuda::CUDAEvent>> event_pool;
#endif
std::mutex event_pool_mutex;
};
class Comm;
// UCC does not support multiple CUDA devices per process.
class TORCH_API ProcessGroupUCC : public ProcessGroup {
private:
void set_timeout(ucc_coll_args_t& args);
public:
class WorkData {
public:
std::vector<at::Tensor> src;
std::vector<at::Tensor> dst;
std::vector<at::Tensor> flat;
WorkData() {}
virtual ~WorkData() = default;
};
class AlltoallWorkData : public WorkData {
public:
AlltoallWorkData(int size)
: send_lengths(size),
send_offsets(size),
recv_lengths(size),
recv_offsets(size) {}
std::vector<uint64_t> send_lengths;
std::vector<uint64_t> send_offsets;
std::vector<uint64_t> recv_lengths;
std::vector<uint64_t> recv_offsets;
};
class AllgathervWorkData : public WorkData {
public:
AllgathervWorkData(int size) : recv_lengths(size), recv_offsets(size) {}
std::vector<uint64_t> recv_lengths;
std::vector<uint64_t> recv_offsets;
};
class ScattervWorkData : public WorkData {
public:
ScattervWorkData(int size) : send_lengths(size), send_offsets(size) {}
std::vector<uint64_t> send_lengths;
std::vector<uint64_t> send_offsets;
};
class ProgressEntry {
friend class ProcessGroupUCC;
friend class Comm;
public:
ProgressEntry(CommBase* comm, ucc_coll_req_h request)
: status_(UCC_INPROGRESS), comm_(comm), request_(request) {}
// Finalizes UCC status or exception of collective request.
void finalize(std::exception_ptr eptr = nullptr);
ucc_status_t status_;
CommBase* comm_;
ucc_coll_req_h request_;
std::unique_ptr<WorkData> data;
c10::intrusive_ptr<c10::ivalue::Future> future_;
std::exception_ptr eptr_;
};
class WorkUCC : public Work {
friend class ProcessGroupUCC;
friend class Comm;
public:
WorkUCC(
OpType opType,
const char* prof_title,
const c10::optional<std::vector<at::Tensor>>& inputs,
const c10::intrusive_ptr<ProcessGroupUCCLogger>& logger)
: Work(-1, opType, prof_title, inputs), logger_(logger) {}
~WorkUCC();
void setException();
void setAndThrowException();
bool isCompleted() override;
bool isSuccess() const override;
bool wait(std::chrono::milliseconds timeout = kUnsetTimeout) override;
c10::intrusive_ptr<c10::ivalue::Future> getFuture() override;
std::vector<at::Tensor> result() override;
int sourceRank() const override;
#ifdef USE_CUDA
std::unique_ptr<at::cuda::CUDAEvent> fence = nullptr;
event_pool_t* ep = nullptr;
#endif
int sourceRank_;
protected:
std::shared_ptr<ProgressEntry> entry_;
c10::intrusive_ptr<ProcessGroupUCCLogger> logger_;
private:
// The future returned by getFuture.
c10::intrusive_ptr<at::ivalue::Future> future_;
// Store a reference to collective's outputs, used by result
std::shared_ptr<std::vector<at::Tensor>> outputs_;
};
explicit ProcessGroupUCC(
const c10::intrusive_ptr<Store>& store,
int rank = -1,
int size = -1,
std::chrono::duration<float> timeout = kProcessGroupDefaultTimeout);
void initComm(c10::Device dev);
~ProcessGroupUCC() override;
const std::string getBackendName() const override {
return std::string(UCC_BACKEND_NAME);
}
#ifdef USE_CUDA
std::unique_ptr<at::cuda::CUDAEvent> getPooledEvent();
#endif
// Performs a health check by initializing dummy UCC & UCX communicators and
// then destroying them. This will help indicate and signal any
// UCC/UCX-related issues prior to the first collective. The actual
// initialization and subsequent destruction is ran on a separate thread and
// the main thread is signalled about timeouts/errors to report to the
// application.
void runHealthCheck();
template <typename PreProcess, typename PostProcess>
c10::intrusive_ptr<Work> collective_post(
OpType opType,
PreProcess preproc,
PostProcess postproc,
ucc_coll_args_t& coll,
std::unique_ptr<ProcessGroupUCC::WorkData> data,
c10::Device dev,
std::vector<at::Tensor>& inputTensors,
std::vector<at::Tensor>& outputTensors,
const char* prof_title);
c10::intrusive_ptr<Work> broadcast(
std::vector<at::Tensor>& data,
const BroadcastOptions& opts = BroadcastOptions()) override;
c10::intrusive_ptr<Work> allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts = AllreduceOptions()) override;
c10::intrusive_ptr<Work> allreduce_coalesced(
std::vector<at::Tensor>& tensors,
const AllreduceCoalescedOptions& opts =
AllreduceCoalescedOptions()) override;
c10::intrusive_ptr<Work> reduce(
std::vector<at::Tensor>& tensors,
const ReduceOptions& opts = ReduceOptions()) override;
c10::intrusive_ptr<Work> allgather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> _allgather_base(
at::Tensor& outputBuffer,
at::Tensor& inputBuffer,
const AllgatherOptions& opts = AllgatherOptions()) override;
c10::intrusive_ptr<Work> barrier(
const BarrierOptions& opts = BarrierOptions()) override;
c10::intrusive_ptr<Work> gather(
std::vector<std::vector<at::Tensor>>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const GatherOptions& opts = GatherOptions()) override;
c10::intrusive_ptr<Work> scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ScatterOptions& opts = ScatterOptions()) override;
c10::intrusive_ptr<Work> reduce_scatter(
std::vector<at::Tensor>& outputTensors,
std::vector<std::vector<at::Tensor>>& inputTensors,
const ReduceScatterOptions& opts = ReduceScatterOptions()) override;
c10::intrusive_ptr<Work> alltoall_base(
at::Tensor& outputTensor,
at::Tensor& inputTensor,
std::vector<int64_t>& outputSplitSizes,
std::vector<int64_t>& inputSplitSizes,
const AllToAllOptions& opts = AllToAllOptions()) override;
c10::intrusive_ptr<Work> alltoall(
std::vector<at::Tensor>& outputTensors,
std::vector<at::Tensor>& inputTensors,
const AllToAllOptions& opts = AllToAllOptions()) override;
c10::intrusive_ptr<Work> send(
std::vector<at::Tensor>& tensors,
int dstRank,
int tag) override;
c10::intrusive_ptr<Work> recv(
std::vector<at::Tensor>& tensors,
int srcRank,
int tag) override;
static c10::intrusive_ptr<ProcessGroup> createProcessGroupUCC(
const c10::intrusive_ptr<::c10d::Store>& store,
int rank,
int size,
const std::chrono::duration<float>& timeout);
protected:
const std::chrono::duration<float> timeout_;
std::shared_ptr<torch_ucc_oob_coll_info_t> oob;
std::shared_ptr<Comm> comm = {nullptr};
uint32_t comm_id;
ucc_team_h team{nullptr};
ucc_ee_h cuda_ee{nullptr};
#ifdef USE_CUDA
std::unique_ptr<at::cuda::CUDAStream> stream = nullptr;
event_pool_t ep;
#endif
c10::intrusive_ptr<ProcessGroupUCCLogger> logger;
};
class Comm {
c10::intrusive_ptr<ProcessGroupUCCLogger> logger;
std::shared_ptr<torch_ucc_oob_coll_info_t> oob;
CommUCC ucc_comm;
std::mutex mutex;
std::thread progress_thread;
std::condition_variable queue_produce_cv;
std::condition_variable queue_consume_cv;
std::deque<std::shared_ptr<ProcessGroupUCC::ProgressEntry>> progress_queue;
bool stop_progress_loop;
bool collective_inprogress;
torch_ucc_phase_t finalize_phase;
public:
c10::DeviceIndex cuda_device_index;
Comm(
const c10::intrusive_ptr<ProcessGroupUCCLogger>& logger,
std::shared_ptr<torch_ucc_oob_coll_info_t> oob,
c10::Device dev,
bool is_health_check);
~Comm();
void ucc_create_team(
ucc_team_h& team,
std::shared_ptr<torch_ucc_oob_coll_info_t> oob);
void ucc_destroy_team(ucc_team_h& team);
c10::intrusive_ptr<Work> enqueue_p2p(
OpType opType,
ucc_coll_req_h request,
const char* prof_title);
#ifdef USE_CUDA
void enqueue_cuda_collective(
std::unique_ptr<ProcessGroupUCC::WorkData> data,
c10::intrusive_ptr<ProcessGroupUCC::WorkUCC> work,
ucc_coll_args_t& coll,
ucc_team_h team,
ucc_ee_h ee);
#endif
void enqueue_collective(
std::unique_ptr<ProcessGroupUCC::WorkData> data,
c10::intrusive_ptr<ProcessGroupUCC::WorkUCC> work,
ucc_coll_args_t& coll,
ucc_team_h team);
static std::shared_ptr<Comm> get_comm(
uint32_t& id,
c10::Device dev,
std::shared_ptr<torch_ucc_oob_coll_info_t> oob,
const c10::intrusive_ptr<ProcessGroupUCCLogger>& logger,
bool is_health_check = false);
void progress_loop();
};
} // namespace c10d
#endif // USE_C10D_UCC
|