1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
#ifndef _CALLBACK_REGISTRY_H_
#define _CALLBACK_REGISTRY_H_
#include <Rcpp.h>
#include <queue>
#include <boost/operators.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "timestamp.h"
#include "optional.h"
#include "threadutils.h"
// Callback is an abstract class with two subclasses. The reason that there
// are two subclasses is because one of them is for C++ (boost::function)
// callbacks, and the other is for R (Rcpp::Function) callbacks. Because
// Callbacks can be created from either the main thread or a background
// thread, the top-level Callback class cannot contain any Rcpp objects --
// otherwise R objects could be allocated on a background thread, which will
// cause memory corruption.
class Callback {
public:
virtual ~Callback() {};
Callback(Timestamp when) : when(when) {};
bool operator<(const Callback& other) const {
return this->when < other.when ||
(!(this->when > other.when) && this->callbackId < other.callbackId);
}
bool operator>(const Callback& other) const {
return other < *this;
}
uint64_t getCallbackId() const {
return callbackId;
};
virtual void invoke() const = 0;
void invoke_wrapped() const;
virtual Rcpp::RObject rRepresentation() const = 0;
Timestamp when;
protected:
// Used to break ties when comparing to a callback that has precisely the same
// timestamp
uint64_t callbackId;
};
class BoostFunctionCallback : public Callback {
public:
BoostFunctionCallback(Timestamp when, boost::function<void (void)> func);
void invoke() const {
func();
}
Rcpp::RObject rRepresentation() const;
private:
boost::function<void (void)> func;
};
class RcppFunctionCallback : public Callback {
public:
RcppFunctionCallback(Timestamp when, Rcpp::Function func);
void invoke() const {
func();
}
Rcpp::RObject rRepresentation() const;
private:
Rcpp::Function func;
};
typedef boost::shared_ptr<Callback> Callback_sp;
template <typename T>
struct pointer_less_than {
const bool operator()(const T a, const T b) const {
return *a < *b;
}
};
// Stores R function callbacks, ordered by timestamp.
class CallbackRegistry {
private:
int id;
// Most of the behavior of the registry is like a priority queue. However, a
// std::priority_queue only allows access to the top element, and when we
// cancel a callback or get an Rcpp::List representation, we need random
// access, so we'll use a std::set.
typedef std::set<Callback_sp, pointer_less_than<Callback_sp> > cbSet;
// This is a priority queue of shared pointers to Callback objects. The
// reason it is not a priority_queue<Callback> is because that can cause
// objects to be copied on the wrong thread, and even trigger an R GC event
// on the wrong thread. https://github.com/r-lib/later/issues/39
cbSet queue;
Mutex* mutex;
ConditionVariable* condvar;
public:
// The CallbackRegistry must be given a Mutex and ConditionVariable when
// initialized, because they are shared among the CallbackRegistry objects
// and the CallbackRegistryTable; they serve as a global lock. Note that the
// lifetime of these objects must be longer than the CallbackRegistry.
CallbackRegistry(int id, Mutex* mutex, ConditionVariable* condvar);
~CallbackRegistry();
int getId() const;
// Add a function to the registry, to be executed at `secs` seconds in
// the future (i.e. relative to the current time).
uint64_t add(Rcpp::Function func, double secs);
// Add a C function to the registry, to be executed at `secs` seconds in
// the future (i.e. relative to the current time).
uint64_t add(void (*func)(void*), void* data, double secs);
bool cancel(uint64_t id);
// The smallest timestamp present in the registry, if any.
// Use this to determine the next time we need to pump events.
Optional<Timestamp> nextTimestamp(bool recursive = true) const;
// Is the registry completely empty?
bool empty() const;
// Is anything ready to execute?
bool due(const Timestamp& time = Timestamp(), bool recursive = true) const;
// Pop and return an ordered list of functions to execute now.
std::vector<Callback_sp> take(size_t max = -1, const Timestamp& time = Timestamp());
// Wait until the next available callback is ready to execute.
bool wait(double timeoutSecs, bool recursive) const;
// Return a List of items in the queue.
Rcpp::List list() const;
// References to parent and children registries. These are used for
// automatically running child loops. They should only be accessed and
// modified from the main thread.
boost::shared_ptr<CallbackRegistry> parent;
std::vector<boost::shared_ptr<CallbackRegistry> > children;
};
#endif // _CALLBACK_REGISTRY_H_
|