File: grpc.h

package info (click to toggle)
gdal 3.11.3%2Bdfsg-1~exp1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 89,016 kB
  • sloc: cpp: 1,165,048; ansic: 208,864; python: 26,958; java: 5,972; xml: 4,611; sh: 3,776; cs: 2,508; yacc: 1,306; makefile: 213
file content (300 lines) | stat: -rw-r--r-- 9,969 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
/*
 * Copyright 2014 Google Inc. All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef FLATBUFFERS_GRPC_H_
#define FLATBUFFERS_GRPC_H_

// Helper functionality to glue FlatBuffers and GRPC.

#include "flatbuffers/flatbuffers.h"
#include "grpc/byte_buffer_reader.h"
#include "grpcpp/support/byte_buffer.h"
#include "grpcpp/support/slice.h"

namespace flatbuffers {
namespace grpc {

// Message is a typed wrapper around a buffer that manages the underlying
// `grpc_slice` and also provides flatbuffers-specific helpers such as `Verify`
// and `GetRoot`. Since it is backed by a `grpc_slice`, the underlying buffer
// is refcounted and ownership is be managed automatically.
template<class T> class Message {
 public:
  Message() {}

  Message(::grpc::Slice slice) : slice_(slice) {}

  Message &operator=(const Message &other) = delete;

  Message(Message &&other) = default;

  Message(const Message &other) = delete;

  Message &operator=(Message &&other) = default;

  const uint8_t *mutable_data() const { return slice_.begin(); }

  const uint8_t *data() const { return slice_.begin(); }

  size_t size() const { return slice_.size(); }

  bool Verify() const {
    Verifier verifier(data(), size());
    return verifier.VerifyBuffer<T>(nullptr);
  }

  T *GetMutableRoot() { return flatbuffers::GetMutableRoot<T>(mutable_data()); }

  const T *GetRoot() const { return flatbuffers::GetRoot<T>(data()); }

  // This is only intended for serializer use, or if you know what you're doing
  const ::grpc::Slice &BorrowSlice() const { return slice_; }

 private:
  ::grpc::Slice slice_;
};

class MessageBuilder;

// SliceAllocator is a gRPC-specific allocator that uses the `grpc_slice`
// refcounted slices to manage memory ownership. This makes it easy and
// efficient to transfer buffers to gRPC.
class SliceAllocator : public Allocator {
 public:
  SliceAllocator() {}

  SliceAllocator(const SliceAllocator &other) = delete;
  SliceAllocator &operator=(const SliceAllocator &other) = delete;

  SliceAllocator(SliceAllocator &&other) {
    // default-construct and swap idiom
    swap(other);
  }

  SliceAllocator &operator=(SliceAllocator &&other) {
    // move-construct and swap idiom
    SliceAllocator temp(std::move(other));
    swap(temp);
    return *this;
  }

  void swap(SliceAllocator &other) {
    using std::swap;
    swap(slice_, other.slice_);
  }

  virtual ~SliceAllocator() {}

  virtual uint8_t *allocate(size_t size) override {
    FLATBUFFERS_ASSERT(slice_.size() == 0);
    slice_ = ::grpc::Slice(size);
    return const_cast<uint8_t *>(slice_.begin());
  }

  virtual void deallocate(uint8_t *p, size_t size) override {
    FLATBUFFERS_ASSERT(p == slice_.begin());
    FLATBUFFERS_ASSERT(size == slice_.size());
    slice_ = ::grpc::Slice();
  }

  virtual uint8_t *reallocate_downward(uint8_t *old_p, size_t old_size,
                                       size_t new_size, size_t in_use_back,
                                       size_t in_use_front) override {
    FLATBUFFERS_ASSERT(old_p == slice_.begin());
    FLATBUFFERS_ASSERT(old_size == slice_.size());
    FLATBUFFERS_ASSERT(new_size > old_size);
    ::grpc::Slice old_slice = slice_;
    ::grpc::Slice new_slice = ::grpc::Slice(new_size);
    uint8_t *new_p = const_cast<uint8_t *>(new_slice.begin());
    memcpy_downward(old_p, old_size, new_p, new_size, in_use_back,
                    in_use_front);
    slice_ = new_slice;
    return new_p;
  }

 private:
  ::grpc::Slice &get_slice(uint8_t *p, size_t size) {
    FLATBUFFERS_ASSERT(p == slice_.begin());
    FLATBUFFERS_ASSERT(size == slice_.size());
    return slice_;
  }

  ::grpc::Slice slice_;

  friend class MessageBuilder;
};

// SliceAllocatorMember is a hack to ensure that the MessageBuilder's
// slice_allocator_ member is constructed before the FlatBufferBuilder, since
// the allocator is used in the FlatBufferBuilder ctor.
namespace detail {
struct SliceAllocatorMember {
  SliceAllocator slice_allocator_;
};
}  // namespace detail

// MessageBuilder is a gRPC-specific FlatBufferBuilder that uses SliceAllocator
// to allocate gRPC buffers.
class MessageBuilder : private detail::SliceAllocatorMember,
                       public FlatBufferBuilder {
 public:
  explicit MessageBuilder(uoffset_t initial_size = 1024)
      : FlatBufferBuilder(initial_size, &slice_allocator_, false) {}

  MessageBuilder(const MessageBuilder &other) = delete;
  MessageBuilder &operator=(const MessageBuilder &other) = delete;

  MessageBuilder(MessageBuilder &&other)
      : FlatBufferBuilder(1024, &slice_allocator_, false) {
    // Default construct and swap idiom.
    Swap(other);
  }

  /// Create a MessageBuilder from a FlatBufferBuilder.
  explicit MessageBuilder(FlatBufferBuilder &&src,
                          void (*dealloc)(void *,
                                          size_t) = &DefaultAllocator::dealloc)
      : FlatBufferBuilder(1024, &slice_allocator_, false) {
    src.Swap(*this);
    src.SwapBufAllocator(*this);
    if (buf_.capacity()) {
      uint8_t *buf = buf_.scratch_data();  // pointer to memory
      size_t capacity = buf_.capacity();   // size of memory
      slice_allocator_.slice_ = ::grpc::Slice(buf, capacity, dealloc);
    } else {
      slice_allocator_.slice_ = ::grpc::Slice();
    }
  }

  /// Move-assign a FlatBufferBuilder to a MessageBuilder.
  /// Only FlatBufferBuilder with default allocator (basically, nullptr) is
  /// supported.
  MessageBuilder &operator=(FlatBufferBuilder &&src) {
    // Move construct a temporary and swap
    MessageBuilder temp(std::move(src));
    Swap(temp);
    return *this;
  }

  MessageBuilder &operator=(MessageBuilder &&other) {
    // Move construct a temporary and swap
    MessageBuilder temp(std::move(other));
    Swap(temp);
    return *this;
  }

  void Swap(MessageBuilder &other) {
    slice_allocator_.swap(other.slice_allocator_);
    FlatBufferBuilder::Swap(other);
    // After swapping the FlatBufferBuilder, we swap back the allocator, which
    // restores the original allocator back in place. This is necessary because
    // MessageBuilder's allocator is its own member (SliceAllocatorMember). The
    // allocator passed to FlatBufferBuilder::vector_downward must point to this
    // member.
    buf_.swap_allocator(other.buf_);
  }

  // Releases the ownership of the buffer pointer.
  // Returns the size, offset, and the original grpc_slice that
  // allocated the buffer. Also see grpc_slice_unref().
  uint8_t *ReleaseRaw(size_t &size, size_t &offset, ::grpc::Slice &slice) {
    uint8_t *buf = FlatBufferBuilder::ReleaseRaw(size, offset);
    slice = slice_allocator_.slice_;
    slice_allocator_.slice_ = ::grpc::Slice();
    return buf;
  }

  ~MessageBuilder() {}

  // GetMessage extracts the subslice of the buffer corresponding to the
  // flatbuffers-encoded region and wraps it in a `Message<T>` to handle buffer
  // ownership.
  template<class T> Message<T> GetMessage() {
    auto buf_data = buf_.scratch_data();  // pointer to memory
    auto buf_size = buf_.capacity();      // size of memory
    auto msg_data = buf_.data();          // pointer to msg
    auto msg_size = buf_.size();          // size of msg
    // Do some sanity checks on data/size
    FLATBUFFERS_ASSERT(msg_data);
    FLATBUFFERS_ASSERT(msg_size);
    FLATBUFFERS_ASSERT(msg_data >= buf_data);
    FLATBUFFERS_ASSERT(msg_data + msg_size <= buf_data + buf_size);
    // Calculate offsets from the buffer start
    auto begin = msg_data - buf_data;
    auto end = begin + msg_size;
    // Get the slice we are working with (no refcount change)
    ::grpc::Slice slice = slice_allocator_.get_slice(buf_data, buf_size);
    // Extract a subslice of the existing slice (increment refcount)
    ::grpc::Slice subslice = slice.sub(begin, end);
    // Wrap the subslice in a `Message<T>`, but don't increment refcount
    Message<T> msg(subslice);
    return msg;
  }

  template<class T> Message<T> ReleaseMessage() {
    Message<T> msg = GetMessage<T>();
    Reset();
    return msg;
  }

 private:
  // SliceAllocator slice_allocator_;  // part of SliceAllocatorMember
};

}  // namespace grpc
}  // namespace flatbuffers

namespace grpc {

template<class T> class SerializationTraits<flatbuffers::grpc::Message<T>> {
 public:
  static grpc::Status Serialize(const flatbuffers::grpc::Message<T> &msg,
                                ByteBuffer *buffer, bool *own_buffer) {
    // Package the single slice into a `ByteBuffer`,
    // incrementing the refcount in the process.
    *buffer = ByteBuffer(&msg.BorrowSlice(), 1);
    *own_buffer = true;
    return grpc::Status::OK;
  }

  // Deserialize by pulling the
  static grpc::Status Deserialize(ByteBuffer *buf,
                                  flatbuffers::grpc::Message<T> *msg) {
    Slice slice;
    if (!buf->TrySingleSlice(&slice).ok()) {
      if (!buf->DumpToSingleSlice(&slice).ok()) {
        buf->Clear();
        return ::grpc::Status(::grpc::StatusCode::INTERNAL, "No payload");
      }
    }
    *msg = flatbuffers::grpc::Message<T>(slice);
    buf->Clear();
#if FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
    return ::grpc::Status::OK;
#else
    if (msg->Verify()) {
      return ::grpc::Status::OK;
    } else {
      return ::grpc::Status(::grpc::StatusCode::INTERNAL,
                            "Message verification failed");
    }
#endif
  }
};

}  // namespace grpc

#endif  // FLATBUFFERS_GRPC_H_