1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
|
#ifndef CAFFE2_CORE_EVENT_H_
#define CAFFE2_CORE_EVENT_H_
#include <chrono>
#include <c10/core/DeviceType.h>
#include "caffe2/core/common.h"
#include "caffe2/core/logging.h"
#include "caffe2/proto/caffe2_pb.h"
namespace caffe2 {
constexpr int MaxDeviceTypes =
DeviceTypeProto::PROTO_COMPILE_TIME_MAX_DEVICE_TYPES;
class Event;
enum EventStatus {
EVENT_INITIALIZED = 0,
EVENT_SCHEDULED = 1,
EVENT_SUCCESS = 2,
EVENT_FAILED = 3,
};
// For the following functions, void* shall be interpreted as the corresponding
// context object corresponding to the device type associated with the
// functions.
// Initializes event
typedef void (*EventCreateFunction)(const DeviceOption& option, Event*);
// Called on event to signal that CPU part of operation is finished,
// Optionally accepts error message from CPU part.
// Should be called no more than once per event
typedef void (*EventRecordFunction)(Event*, const void*, const char*);
// Waits and returns as soon as possible in order schedule next operation,
// e.g. for CUDA->CUDA waits only for CPU part of CUDA op,
// for CUDA->CPU waits till the CUDA op is fully completed.
// Prepares context to synchronize device part of operation.
// Can be called concurrently from multiple threads
typedef void (*EventWaitFunction)(const Event*, void*);
// Waits till operation is fully finished,
// can be called concurrently from multiple threads
typedef void (*EventFinishFunction)(const Event*);
// Queries current status of operation,
// can be called concurrently from multiple threads
typedef EventStatus (*EventQueryFunction)(const Event*);
typedef const std::string& (*EventErrorMessageFunction)(const Event*);
typedef void (*EventSetFinishedFunction)(const Event*, const char*);
typedef void (*EventResetFunction)(Event*);
// Sets callback that is called when event is finished
typedef std::function<void()> EventCallbackFunction;
typedef void (*EventSetCallbackFunction)(Event*, EventCallbackFunction);
class TORCH_API Event {
public:
explicit Event(const DeviceOption& option)
: event_(), type_(option.device_type()), option_(option) {
CAFFE_ENFORCE_LT(type_, MaxDeviceTypes);
CAFFE_ENFORCE(event_creator_[type_]);
event_creator_[type_](option, this);
}
// Nothing needs to be done in the destructor, as the event creator should
// set the proper destruction process for the unique_ptr.
~Event() {}
void Record(
DeviceType recorder_type,
const void* context,
const char* err_msg = nullptr) {
auto recorder_index = TypeToProto(recorder_type);
CAFFE_ENFORCE_EQ(
recorder_index,
type_,
"You are trying to record with a wrong device type.");
CAFFE_ENFORCE(event_recorder_[recorder_index]);
event_recorder_[recorder_index](this, context, err_msg);
}
void Wait(DeviceType waiter_type, void* context) const {
auto waiter_index = TypeToProto(waiter_type);
CAFFE_ENFORCE(event_waiter_[waiter_index][type_]);
event_waiter_[waiter_index][type_](this, context);
}
void Finish() const {
CAFFE_ENFORCE(event_finisher_[type_]);
event_finisher_[type_](this);
}
EventStatus Query() const {
CAFFE_ENFORCE(event_querier_[type_]);
return event_querier_[type_](this);
}
const std::string& ErrorMessage() const {
CAFFE_ENFORCE(event_err_msg_getter_[type_]);
return event_err_msg_getter_[type_](this);
}
void Reset() {
CAFFE_ENFORCE(event_resetter_[type_]);
event_resetter_[type_](this);
#ifdef CAFFE2_USE_EXCEPTION_PTR
caught_exception_ = nullptr;
#endif // CAFFE2_USE_EXCEPTION_PTR
error_timestamp_ = 0;
}
const DeviceOption& GetDeviceOption() const {
return option_;
}
bool IsScheduled() const {
return Query() == EventStatus::EVENT_SCHEDULED;
}
bool IsFinished() const {
auto status = Query();
return status == EventStatus::EVENT_SUCCESS ||
status == EventStatus::EVENT_FAILED;
}
void SetFinished(const char* err_msg = nullptr) {
typedef std::chrono::high_resolution_clock clock;
error_timestamp_ = std::chrono::duration_cast<std::chrono::nanoseconds>(
clock::now().time_since_epoch())
.count();
CAFFE_ENFORCE(event_finished_setter_[type_]);
return event_finished_setter_[type_](this, err_msg);
}
bool SupportsCallback() const {
return event_callback_setter_[type_] != nullptr;
}
void SetCallback(EventCallbackFunction callback) {
CAFFE_ENFORCE(
event_callback_setter_[type_], "Event does not support callbacks");
event_callback_setter_[type_](this, callback);
}
// If parent op has succeeded, then we can run any child op;
// If parent op is in scheduled state, we need to check that:
// - child op supports async scheduling
// - there's a way to setup synchronization between async parent and
// child - both child and parent should use the same type of device,
// non-blocking synchronization between different device types is not
// supported
// If parent op is in another state (initialized or failed) then scheduling
// is not possible
bool CanSchedule(const Event& child_event, bool supports_async) const {
return CanSchedule(type_, Query(), child_event.GetType(), supports_async);
}
static bool CanSchedule(
int parent_type,
EventStatus parent_status,
int child_type,
bool child_supports_async) {
if (parent_status == EventStatus::EVENT_SUCCESS) {
return true;
}
if (parent_status == EventStatus::EVENT_SCHEDULED) {
return (parent_type == child_type) && child_supports_async;
}
return false;
}
int GetType() const {
return type_;
}
void SetFinishedWithException(const char* err_msg = nullptr) {
#ifdef CAFFE2_USE_EXCEPTION_PTR
if (!caught_exception_) {
caught_exception_ = std::current_exception();
}
CAFFE_ENFORCE(caught_exception_, "No exception found");
#else
VLOG(1) << "No support for exceptions in Event";
#endif // CAFFE2_USE_EXCEPTION_PTR
if (err_msg) {
SetFinished(err_msg);
} else {
SetFinished("Error happened during an operator run");
}
}
bool HasException() const {
#ifdef CAFFE2_USE_EXCEPTION_PTR
return (bool)caught_exception_;
#else
VLOG(1) << "No support for exceptions in Event";
return false;
#endif // CAFFE2_USE_EXCEPTION_PTR
}
int64_t ErrorTimestamp() const {
return error_timestamp_;
}
void RethrowException() const {
#ifdef CAFFE2_USE_EXCEPTION_PTR
if (caught_exception_) {
std::rethrow_exception(caught_exception_);
}
#else
VLOG(1) << "No support for exceptions in Event";
#endif // CAFFE2_USE_EXCEPTION_PTR
}
// event_ is going to be accessed by the EventCreate/Record/Wait/Finish
// functions, but one should not use it outside the own Event functionalities.
// In the future we may move it to a private member.
std::shared_ptr<void> event_;
private:
int type_;
DeviceOption option_;
#ifdef CAFFE2_USE_EXCEPTION_PTR
std::exception_ptr caught_exception_;
#endif // CAFFE2_USE_EXCEPTION_PTR
int64_t error_timestamp_{};
static EventCreateFunction event_creator_[MaxDeviceTypes];
static EventRecordFunction event_recorder_[MaxDeviceTypes];
static EventWaitFunction event_waiter_[MaxDeviceTypes][MaxDeviceTypes];
static EventFinishFunction event_finisher_[MaxDeviceTypes];
static EventQueryFunction event_querier_[MaxDeviceTypes];
static EventErrorMessageFunction event_err_msg_getter_[MaxDeviceTypes];
static EventSetFinishedFunction event_finished_setter_[MaxDeviceTypes];
static EventResetFunction event_resetter_[MaxDeviceTypes];
static EventSetCallbackFunction event_callback_setter_[MaxDeviceTypes];
template <DeviceType t>
friend struct EventCreateFunctionRegisterer;
template <DeviceType t>
friend struct EventRecordFunctionRegisterer;
template <DeviceType w, DeviceType d>
friend struct EventWaitFunctionRegisterer;
template <DeviceType t>
friend struct EventFinishFunctionRegisterer;
template <DeviceType t>
friend struct EventQueryFunctionRegisterer;
template <DeviceType t>
friend struct EventErrorMessageFunctionRegisterer;
template <DeviceType t>
friend struct EventSetFinishedFunctionRegisterer;
template <DeviceType t>
friend struct EventSetCallbackFunctionRegisterer;
template <DeviceType t>
friend struct EventResetFunctionRegisterer;
};
template <DeviceType t>
struct EventCreateFunctionRegisterer {
explicit EventCreateFunctionRegisterer(EventCreateFunction f) {
auto d = TypeToProto(t);
Event::event_creator_[d] = f;
}
};
#define REGISTER_EVENT_CREATE_FUNCTION(t, f) \
namespace { \
static EventCreateFunctionRegisterer<t> g_event_create_##d(f); \
}
template <DeviceType t>
struct EventRecordFunctionRegisterer {
explicit EventRecordFunctionRegisterer(EventRecordFunction f) {
auto d = TypeToProto(t);
Event::event_recorder_[d] = f;
}
};
#define REGISTER_EVENT_RECORD_FUNCTION(t, f) \
namespace { \
static EventRecordFunctionRegisterer<t> g_event_record_##d(f); \
}
template <DeviceType waiter_type, DeviceType event_type>
struct EventWaitFunctionRegisterer {
explicit EventWaitFunctionRegisterer(EventWaitFunction f) {
auto waiter_index = TypeToProto(waiter_type);
auto event_index = TypeToProto(event_type);
Event::event_waiter_[waiter_index][event_index] = f;
}
};
#define REGISTER_EVENT_WAIT_FUNCTION(w, d, f) \
namespace { \
static EventWaitFunctionRegisterer<w, d> g_event_wait_##w##_##d(f); \
}
template <DeviceType t>
struct EventQueryFunctionRegisterer {
explicit EventQueryFunctionRegisterer(EventQueryFunction f) {
auto d = TypeToProto(t);
Event::event_querier_[d] = f;
}
};
#define REGISTER_EVENT_QUERY_FUNCTION(t, f) \
namespace { \
static EventQueryFunctionRegisterer<t> g_event_query_##d(f); \
}
template <DeviceType t>
struct EventErrorMessageFunctionRegisterer {
explicit EventErrorMessageFunctionRegisterer(EventErrorMessageFunction f) {
auto d = TypeToProto(t);
Event::event_err_msg_getter_[d] = f;
}
};
#define REGISTER_EVENT_ERROR_MESSAGE_FUNCTION(t, f) \
namespace { \
static EventErrorMessageFunctionRegisterer<t> g_event_err_msg_##d(f); \
}
template <DeviceType t>
struct EventSetFinishedFunctionRegisterer {
explicit EventSetFinishedFunctionRegisterer(EventSetFinishedFunction f) {
auto d = TypeToProto(t);
Event::event_finished_setter_[d] = f;
}
};
#define REGISTER_EVENT_SET_FINISHED_FUNCTION(t, f) \
namespace { \
static EventSetFinishedFunctionRegisterer<t> g_event_set_finished_##d(f); \
}
template <DeviceType t>
struct EventSetCallbackFunctionRegisterer {
explicit EventSetCallbackFunctionRegisterer(EventSetCallbackFunction f) {
auto d = TypeToProto(t);
Event::event_callback_setter_[d] = f;
}
};
#define REGISTER_EVENT_SET_CALLBACK_FUNCTION(t, f) \
namespace { \
static EventSetCallbackFunctionRegisterer<t> g_event_set_callback_##d(f); \
}
template <DeviceType t>
struct EventFinishFunctionRegisterer {
explicit EventFinishFunctionRegisterer(EventFinishFunction f) {
auto d = TypeToProto(t);
Event::event_finisher_[d] = f;
}
};
#define REGISTER_EVENT_FINISH_FUNCTION(t, f) \
namespace { \
static EventFinishFunctionRegisterer<t> g_event_finish_##d(f); \
}
template <DeviceType t>
struct EventResetFunctionRegisterer {
explicit EventResetFunctionRegisterer(EventResetFunction f) {
auto d = TypeToProto(t);
Event::event_resetter_[d] = f;
}
};
#define REGISTER_EVENT_RESET_FUNCTION(t, f) \
namespace { \
static EventResetFunctionRegisterer<t> g_event_reset_##d(f); \
}
} // namespace caffe2
#endif // CAFFE2_CORE_EVENT_H_
|