File: pair.h

package info (click to toggle)
gloo-cuda 0.0~git20231202.5354032-4
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 2,140 kB
  • sloc: cpp: 21,546; python: 8,179; sh: 68; makefile: 67
file content (175 lines) | stat: -rw-r--r-- 5,103 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/**
 * Copyright (c) 2017-present, Facebook, Inc.
 * All rights reserved.
 *
 * This source code is licensed under the BSD-style license found in the
 * LICENSE file in the root directory of this source tree.
 */

#pragma once

#include <array>
#include <atomic>
#include <condition_variable>
#include <exception>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include <array>

#include "gloo/transport/ibverbs/address.h"
#include "gloo/transport/ibverbs/device.h"
#include "gloo/transport/ibverbs/memory_region.h"
#include "gloo/transport/pair.h"

namespace gloo {
namespace transport {
namespace ibverbs {

// Forward declaration
class Buffer;

class Pair : public ::gloo::transport::Pair {
  static constexpr int kMaxBuffers = 8;
  static constexpr auto kRecvCompletionQueueCapacity = kMaxBuffers;
  static constexpr auto kSendCompletionQueueCapacity = kMaxBuffers;
  static constexpr auto kCompletionQueueCapacity =
    kRecvCompletionQueueCapacity + kSendCompletionQueueCapacity;

  // The ibv_req_notify(3) function takes an argument called
  // 'solicited_only' which makes it only trigger a notification for
  // work requests that are flagged as solicited. Every completion
  // should trigger a notification, so always pass 0.
  static constexpr auto kNotifyOnAnyCompletion = 0;

 public:
  explicit Pair(
      const std::shared_ptr<Device>& dev,
      std::chrono::milliseconds timeout);

  virtual ~Pair();

  Pair(const Pair& that) = delete;

  Pair& operator=(const Pair& that) = delete;

  virtual const Address& address() const override;

  virtual void connect(const std::vector<char>& bytes) override;

  virtual void setSync(bool enable, bool busyPoll) override;

  virtual std::unique_ptr<::gloo::transport::Buffer>
  createSendBuffer(int slot, void* ptr, size_t size) override;

  virtual std::unique_ptr<::gloo::transport::Buffer>
  createRecvBuffer(int slot, void* ptr, size_t size) override;

  // Send from the specified buffer to remote side of pair.
  virtual void send(
      transport::UnboundBuffer* tbuf,
      uint64_t tag,
      size_t offset,
      size_t nbytes) override;

  // Receive into the specified buffer from the remote side of pair.
  virtual void recv(
      transport::UnboundBuffer* tbuf,
      uint64_t tag,
      size_t offset,
      size_t nbytes) override;

  void handleCompletionEvent();

  void pollCompletions();

  void handleCompletion(struct ibv_wc* wc);

  void send(Buffer* buf, size_t offset, size_t length, size_t roffset);

  void close() override;

 protected:
  std::shared_ptr<Device> dev_;

  // Whether or not this pair is running in sync mode.
  std::atomic<bool> sync_;

  // Whether or not this pair is busy polling in sync mode.
  std::atomic<bool> busyPoll_;

  const std::chrono::milliseconds timeout_;

  // Number of completion events handled by this pair's completion
  // queue (also see ibv_get_cq_event(3)). This many events need to be
  // acknowledged prior to destructing the completion queue.
  // Otherwise, destruction will hang (see ibv_get_cq_event(3)).
  int completionEventsHandled_;

  Address self_;
  Address peer_;

  struct ibv_cq* cq_;
  struct ibv_qp* qp_;

  std::mutex m_;
  std::condition_variable cv_;

  // For us to copy the remote peer's ibv_mr into.
  std::map<int, struct ibv_mr> peerMemoryRegions_;

  // These fields store memory regions that the remote side of the pair
  // can send to and that the local side of the pair can send from.
  //
  // When registering a receive buffer, the local ibv_mr is sent
  // to the remote side of the pair, and the corresponding MemoryRegion
  // instance is kept around in the mappedSendRegions_ list until
  // the send operation complete.
  //
  // To allow the remote side of the pair to send its memory regions,
  // we keep a fixed number of MemoryRegion instances in
  // mappedRecvRegions_. These regions are referenced round-robin for
  // every posted receive work request.
  //
  std::map<int, std::unique_ptr<MemoryRegion> > mappedSendRegions_;
  std::array<std::unique_ptr<MemoryRegion>, kMaxBuffers> mappedRecvRegions_;

  // Keep track of number of request work requests posted and completed.
  // This is needed to index into the mappedRecvRegions_ array both
  // when posting the WR and when completing the WR.
  uint64_t recvPosted_;

  // Completions on behalf of buffers need to be forwarded to those buffers.
  std::map<int, Buffer*> sendCompletionHandlers_;
  std::map<int, Buffer*> recvCompletionHandlers_;

  void sendMemoryRegion(struct ibv_mr* mr, int slot);
  const struct ibv_mr* getMemoryRegion(int slot);

  void postReceive();

  std::chrono::milliseconds getTimeout() const {
    return timeout_;
  }

  const Address& peer() const {
    return peer_;
  }

 private:
  std::exception_ptr ex_;
  bool closed_ = false;

  // Used to signal IO exceptions from one thread and propagate onto others.
  void signalIoFailure(const std::string& msg);
  void checkErrorState();

  friend class Buffer;
};

} // namespace ibverbs
} // namespace transport
} // namespace gloo