File: SymmetricMemory.hpp

package info (click to toggle)
pytorch-cuda 2.6.0%2Bdfsg-7
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 161,620 kB
  • sloc: python: 1,278,832; cpp: 900,322; ansic: 82,710; asm: 7,754; java: 3,363; sh: 2,811; javascript: 2,443; makefile: 597; ruby: 195; xml: 84; objc: 68
file content (164 lines) | stat: -rw-r--r-- 6,937 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#pragma once

#include <ATen/ATen.h>
#include <torch/csrc/distributed/c10d/Store.hpp>

namespace c10d::symmetric_memory {

// SymmetricMemory represents symmetric allocations across a group of devices.
// The allocations represented by a SymmetricMemory object are accessible by
// all devices in the group. The class can be used for op-level custom
// communication patterns (via the get_buffer APIs and the synchronization
// primitives), as well as custom communication kernels (via the buffer and
// signal_pad device pointers).
//
// To acquire a SymmetricMemory object, each rank first allocates
// identical-sized memory via SymmetricMemoryAllocator::alloc(), then invokes
// SymmetricMemoryAllocator::rendezvous() on the memory to establish the
// association across peer buffers. The rendezvous is a one-time process, and
// the mapping between a local memory memory and the associated SymmetricMemory
// object is unique.
//
// NOTE [symmetric memory signal pad]
// Signal pads are P2P-accessible memory regions designated for
// synchronization. SymmetricMemory offers built-in synchronization primitives
// such as barriers, put_signal, and wait_signal, which are all based on signal
// pads. Users may utilize signal pads for their own synchronization logic,
// provided that the signal pads remain zero-filled following successful
// synchronization.
//
// NOTE [symmetric memory synchronization channel]
// Synchronization channels allow users to use a single SymmetricMemory object
// to perform isolated synchronizations on different streams. For example,
// consider the case in which two barriers are issued on two streams for
// different purposes. Without the concept of channels, we cannot guarantee the
// correctness of the barriers since signals issued from barrier on stream A
// can be received by the barrier on stream B. By specifying different channels
// for these two barriers, they can operate correctly in parallel.
class TORCH_API SymmetricMemory : public c10::intrusive_ptr_target {
 public:
  ~SymmetricMemory() override = default;

  virtual std::vector<void*> get_buffer_ptrs() = 0;
  virtual std::vector<void*> get_signal_pad_ptrs() = 0;

  // get_buffer_ptrs_dev() and get_signal_pad_ptrs_dev() each return a pointer
  // to a device array of size world_size, containing buffer pointers and
  // signal pad pointers, respectively.
  virtual void** get_buffer_ptrs_dev() = 0;
  virtual void** get_signal_pad_ptrs_dev() = 0;
  virtual size_t get_buffer_size() = 0;
  virtual size_t get_signal_pad_size() = 0;

  virtual bool has_multicast_support() = 0;
  virtual void* get_multicast_ptr() = 0;

  virtual at::Tensor get_buffer(
      int rank,
      c10::IntArrayRef sizes,
      c10::ScalarType dtype,
      int64_t storage_offset) = 0;

  virtual at::Tensor get_signal_pad(
      int rank,
      c10::IntArrayRef sizes,
      std::optional<c10::ScalarType> dtype = std::nullopt,
      int64_t storage_offset = 0) = 0;

  virtual void barrier(int channel, size_t timeout_ms) = 0;
  virtual void put_signal(int dst_rank, int channel, size_t timeout_ms) = 0;
  virtual void wait_signal(int src_rank, int channel, size_t timeout_ms) = 0;

  virtual int get_rank() = 0;
  virtual int get_world_size() = 0;
};

class SymmetricMemoryAllocator : public c10::intrusive_ptr_target {
 public:
  ~SymmetricMemoryAllocator() override = default;

  virtual void* alloc(
      size_t size,
      int device_idx,
      const std::optional<std::string>& group_name) = 0;

  virtual void free(void* ptr) = 0;
  virtual size_t get_alloc_size(void* ptr) = 0;
  virtual c10::intrusive_ptr<SymmetricMemory> rendezvous(
      void* ptr,
      const std::optional<std::string>& group_name) = 0;
  virtual bool has_multicast_support(int device_idx) = 0;
};

C10_EXPORT bool is_finalizing();

C10_EXPORT void register_allocator(
    c10::DeviceType device_type,
    c10::intrusive_ptr<SymmetricMemoryAllocator> allocator);

C10_EXPORT bool has_allocator(c10::DeviceType device_type);

C10_EXPORT c10::intrusive_ptr<SymmetricMemoryAllocator> get_allocator(
    c10::DeviceType device_type);

// Set a store for rendezvousing symmetric allocations on a group of devices
// identified by `group_name`. The concept of groups is logical; users can
// utilize predefined groups (e.g., a group of device identified by a
// ProcessGroup) or create custom ones. Note that a SymmetricMemoryAllocator
// backends might employ a more efficient communication channel for the actual
// rendezvous process and only use the store for bootstrapping purposes.
TORCH_API void set_group_info(
    const std::string& group_name,
    int rank,
    int world_size,
    c10::intrusive_ptr<Store> store);

struct GroupInfo {
  int rank;
  int world_size;
  c10::intrusive_ptr<c10d::Store> store;
};

C10_EXPORT const GroupInfo& get_group_info(const std::string& group_name);

// Identical to empty_strided, but allows symmetric memory access to be
// established for the allocated tensor via SymmetricMemory::rendezvous(). This
// function itself is not a collective operation. It invokes
// SymmetricMemoryAllocator::alloc() for the requested device under the hood.
//
// NOTE [symmetric memory persistent allocation]
// If an `alloc_id` is supplied, empty_strided_p2p will perform persistent
// allocation. This makes the function cache allocated memory and ensure that
// invocations with the same `alloc_id` receive tensors backed by the same
// memory address. For safety, if a previous persistent allocation is still
// active (i.e., the storage of the returned tensor is still alive), persistent
// allocations with the same `alloc_id` will fail. This determinism coupled
// with memory planning of communication buffers (e.g., by Inductor) allows
// communication algorithms to reliably reuse previously established remote
// memory access.
TORCH_API at::Tensor empty_strided_p2p(
    c10::IntArrayRef size,
    c10::IntArrayRef stride,
    c10::ScalarType dtype,
    c10::Device device,
    const std::optional<std::string>& group_name,
    std::optional<uint64_t> alloc_id);

// Establishes symmetric memory access on tensors allocated via
// empty_strided_p2p() and empty_strided_p2p_persistent(). rendezvous() is a
// one-time process, and the mapping between a local memory region and the
// associated SymmetricMemory object is unique. Subsequent calls to
// rendezvous() with the same tensor, or tensors allocated with
// empty_strided_p2p_persistent() using the same alloc_id, will receive the
// cached SymmetricMemory object.
//
// The function has a collective semantic and must be invoked simultaneously
// from all rendezvous participants.
TORCH_API c10::intrusive_ptr<SymmetricMemory> rendezvous(
    const at::Tensor& tensor,
    const std::optional<std::string>& group_name = std::nullopt);

TORCH_API bool has_multicast_support(
    c10::DeviceType device_type,
    int device_idx);
} // namespace c10d::symmetric_memory