File: rebatching_queue_ops.h

package info (click to toggle)
pytorch 1.7.1-7
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 80,340 kB
  • sloc: cpp: 670,830; python: 343,991; ansic: 67,845; asm: 5,503; sh: 2,924; java: 2,888; xml: 266; makefile: 244; ruby: 148; yacc: 144; objc: 51; lex: 44
file content (83 lines) | stat: -rw-r--r-- 2,490 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once

#include "rebatching_queue.h"

namespace caffe2 {

using RebatchingQueuePtr = std::unique_ptr<RebatchingQueue>;

class CreateRebatchingQueueOp : public Operator<CPUContext> {
 public:
  CreateRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws) {}

  bool RunOnDevice() override {
    *OperatorBase::Output<RebatchingQueuePtr>(0) =
        RebatchingQueuePtr(new RebatchingQueue(
            OperatorBase::GetSingleArgument<int>("capacity", 1),
            OperatorBase::GetSingleArgument<int>("num_blobs", 1)));
    return true;
  }
};

class EnqueueRebatchingQueueOp : public Operator<CPUContext> {
 public:
  EnqueueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws),
        enqueueBatch_(
            OperatorBase::GetSingleArgument<bool>("enqueue_batch", false)) {}
  bool RunOnDevice() override {
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CHECK(queue);
    CAFFE_ENFORCE_EQ(InputSize(), queue->numBlobs() + 1);
    std::vector<const Tensor*> inputTensors;
    inputTensors.reserve(InputSize() - 1);
    for (int i = 1; i < InputSize(); ++i) {
      inputTensors.push_back(&Input(i));
    }

    return enqueueBatch_ ? queue->enqueueMany(context_, inputTensors)
                         : queue->enqueueOne(context_, inputTensors);
  }

 private:
  const bool enqueueBatch_;
};

class DequeueRebatchingQueueOp : public Operator<CPUContext> {
 public:
  DequeueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws),
        numElements_(OperatorBase::GetSingleArgument<int>("num_elements", 1)) {}

  bool RunOnDevice() override {
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CHECK(queue);

    std::vector<Tensor*> outputTensors;
    outputTensors.reserve(OutputSize());
    for (int i = 0; i < OutputSize(); ++i) {
      outputTensors.push_back(Output(i));
    }

    return queue->dequeue(context_, numElements_, outputTensors);
  }

 private:
  int numElements_;
};

class CloseRebatchingQueueOp : public Operator<CPUContext> {
 public:
  CloseRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
      : Operator(operator_def, ws) {}

  bool RunOnDevice() override {
    CAFFE_ENFORCE_EQ(InputSize(), 1);
    auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
    CAFFE_ENFORCE(queue);
    queue->close();
    return true;
  }
};
} // caffe2