File: rebatching_queue_ops.h

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (85 lines) | stat: -rw-r--r-- 2,549 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#pragma once

#include "rebatching_queue.h"

#include "c10/util/irange.h"

namespace caffe2 {

using RebatchingQueuePtr = std::unique_ptr<RebatchingQueue>;

class CreateRebatchingQueueOp : public Operator<CPUContext> {
 public:
  CreateRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws) {}

  bool RunOnDevice() override {
    *OperatorBase::Output<RebatchingQueuePtr>(0) =
        RebatchingQueuePtr(new RebatchingQueue(
            OperatorBase::GetSingleArgument<int>("capacity", 1),
            OperatorBase::GetSingleArgument<int>("num_blobs", 1)));
    return true;
  }
};

class EnqueueRebatchingQueueOp : public Operator<CPUContext> {
 public:
  EnqueueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws),
        enqueueBatch_(
            OperatorBase::GetSingleArgument<bool>("enqueue_batch", false)) {}
  bool RunOnDevice() override {
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CHECK(queue);
    CAFFE_ENFORCE_EQ(InputSize(), queue->numBlobs() + 1);
    std::vector<const Tensor*> inputTensors;
    inputTensors.reserve(InputSize() - 1);
    for (const auto i : c10::irange(1, InputSize())) {
      inputTensors.push_back(&Input(i));
    }

    return enqueueBatch_ ? queue->enqueueMany(context_, inputTensors)
                         : queue->enqueueOne(context_, inputTensors);
  }

 private:
  const bool enqueueBatch_;
};

class DequeueRebatchingQueueOp : public Operator<CPUContext> {
 public:
  DequeueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws),
        numElements_(OperatorBase::GetSingleArgument<int>("num_elements", 1)) {}

  bool RunOnDevice() override {
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CHECK(queue);

    std::vector<Tensor*> outputTensors;
    outputTensors.reserve(OutputSize());
    for (const auto i : c10::irange(OutputSize())) {
      outputTensors.push_back(Output(i));
    }

    return queue->dequeue(context_, numElements_, outputTensors);
  }

 private:
  int numElements_;
};

class CloseRebatchingQueueOp : public Operator<CPUContext> {
 public:
  CloseRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws) {}

  bool RunOnDevice() override {
    CAFFE_ENFORCE_EQ(InputSize(), 1);
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CAFFE_ENFORCE(queue);
    queue->close();
    return true;
  }
};
} // namespace caffe2