1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
#include "caffe2/operators/rnn/recurrent_network_executor_gpu.h"
#include "caffe2/core/context_gpu.h"
namespace caffe2 {
template <>
std::unique_ptr<RecurrentNetworkExecutorBase> createRNNExecutor<CUDAContext>(
const NetDef& step_net_def,
std::map<string, string>& recurrent_input_map,
std::string timestep_blob,
ArgumentHelper arg_helper) {
auto* exec = new CUDARecurrentNetworkExecutor(
step_net_def, recurrent_input_map, timestep_blob);
int max_streams = arg_helper.GetSingleArgument<int>("rnn_executor.max_cuda_streams", 0);
if (max_streams > 0) {
exec->setMaxStreams(max_streams);
LOG(INFO) << "Set max streams:" << max_streams;
}
std::unique_ptr<RecurrentNetworkExecutorBase> ptr(exec);
return ptr;
}
CUDARecurrentNetworkExecutor::~CUDARecurrentNetworkExecutor() {
for (cudaEvent_t ev : events_) {
if (ev != nullptr) {
CUDA_CHECK(cudaEventDestroy(ev));
}
}
}
/**
* Special execution for CUDA. It tries to run ops with as little overhead as
* possible, but to identify opportunities to run ops with "frontier execution"
* parallelism, i.e by starting kernel from next timestep in parallel with
* the current timestep. This is done by assigning streams.
*/
void CUDARecurrentNetworkExecutor::_ExecRange(int from, int to) {
int direction = to > from ? 1 : -1;
int max_streams = max_parallel_timesteps_ > 0 ?
std::min(max_parallel_timesteps_, max_cuda_streams_)
: max_cuda_streams_;
int stream_seq = 0;
int num_ops = timestep_ops_[0].size();
events_.resize(num_ops * timestep_ops_.size(), nullptr);
int gpu_id = -1;
// Loop over timesteps
for (int t = from; t != to; t += direction) {
bool first_timestep = t == from;
bool last_timestep =
(direction == -1 && t == 0) || (direction == 1 && t == to - 1);
auto& ops = timestep_ops_[t];
int stream_id = stream_seq % max_streams;
for (int i = 0; i < ops.size(); i++) {
auto& rnn_op = ops[i];
// Special handling for link ops -- we just run them directly
// they do not execute any kernels.
if (rnn_op.link_op) {
rnn_op.op->RunAsync(stream_id);
CAFFE_ENFORCE(
rnn_op.dependencies.empty(),
"GPU executor ignores link dependencies");
continue;
}
if (gpu_id == -1 &&
rnn_op.op->device_option().device_type() ==
DeviceTypeProto::PROTO_CUDA) {
gpu_id = rnn_op.op->device_option().device_id();
} else {
CAFFE_ENFORCE(
rnn_op.op->device_option().device_type() == 0 ||
rnn_op.op->device_option().device_id() == gpu_id,
"RNN Executor only supports ops on one GPU");
}
// If have recurrent parents, add for event waits so that those
// parents complete their work.
if (has_timestep_parallelism_ && !first_timestep) {
for (int parent : rnn_op.parents) {
if (parent > i) {
int parent_ev_idx = (t - direction) * num_ops + parent;
CHECK(events_.size() > parent_ev_idx);
CAFFE_ENFORCE(events_[parent_ev_idx] != nullptr);
CUDA_CHECK(cudaStreamWaitEvent(
CUDAContext::cuda_stream(gpu_id, stream_id),
events_[parent_ev_idx],
0));
}
}
}
// Run the op in the given stream
rnn_op.op->RunAsync(stream_id);
// Create and record event for this op, if it has at least one
// recurrent dependency.
if (has_timestep_parallelism_ && !last_timestep) {
for (int dep : rnn_op.dependencies) {
if (dep < i) {
int event_idx = t * num_ops + i;
// Create event for recurrent connections
if (events_[event_idx] == nullptr) {
CUDA_CHECK(cudaEventCreate(&events_[event_idx]));
}
CUDA_CHECK(cudaEventRecord(
events_[event_idx],
CUDAContext::cuda_stream(gpu_id, stream_id)));
break;
}
}
}
} // for over ops
// Next timestep will run on different stream
if (has_timestep_parallelism_) {
stream_seq++;
}
} // for over timesteps
/**
* Wait for all the started streams to complete.
*/
for (int stream_id = 0; stream_id <= std::min(stream_seq, max_streams - 1);
stream_id++) {
VLOG(1) << "Wait for stream:" << stream_id;
CUDA_CHECK(
cudaStreamSynchronize(CUDAContext::cuda_stream(gpu_id, stream_id)));
}
}
bool CUDARecurrentNetworkExecutor::Run(int T) {
CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
if (T == 0) {
return true;
}
_ExecRange(0, T);
return true;
}
bool CUDARecurrentNetworkExecutor::RunBackwards(int T) {
CAFFE_ENFORCE_GE(T, 0, "Negative number of steps");
if (T == 0) {
return true;
}
_ExecRange(T - 1, -1);
return true;
}
}
|