File: callback_registry.h

package info (click to toggle)
r-cran-later 1.1.0.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 412 kB
  • sloc: cpp: 1,473; ansic: 1,096; sh: 29; makefile: 2
file content (160 lines) | stat: -rw-r--r-- 4,761 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#ifndef _CALLBACK_REGISTRY_H_
#define _CALLBACK_REGISTRY_H_

#include <Rcpp.h>
#include <queue>
#include <boost/operators.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include "timestamp.h"
#include "optional.h"
#include "threadutils.h"

// Callback is an abstract class with two subclasses. The reason that there
// are two subclasses is because one of them is for C++ (boost::function)
// callbacks, and the other is for R (Rcpp::Function) callbacks. Because
// Callbacks can be created from either the main thread or a background
// thread, the top-level Callback class cannot contain any Rcpp objects --
// otherwise R objects could be allocated on a background thread, which will
// cause memory corruption.

class Callback {

public:
  virtual ~Callback() {};
  Callback(Timestamp when) : when(when) {};

  bool operator<(const Callback& other) const {
    return this->when < other.when ||
      (!(this->when > other.when) && this->callbackId < other.callbackId);
  }

  bool operator>(const Callback& other) const {
    return other < *this;
  }

  uint64_t getCallbackId() const {
    return callbackId;
  };

  virtual void invoke() const = 0;

  void invoke_wrapped() const;

  virtual Rcpp::RObject rRepresentation() const = 0;

  Timestamp when;

protected:
  // Used to break ties when comparing to a callback that has precisely the same
  // timestamp
  uint64_t callbackId;
};


class BoostFunctionCallback : public Callback {
public:
  BoostFunctionCallback(Timestamp when, boost::function<void (void)> func);

  void invoke() const {
    func();
  }

  Rcpp::RObject rRepresentation() const;

private:
  boost::function<void (void)> func;
};


class RcppFunctionCallback : public Callback {
public:
  RcppFunctionCallback(Timestamp when, Rcpp::Function func);

  void invoke() const {
    func();
  }

  Rcpp::RObject rRepresentation() const;

private:
  Rcpp::Function func;
};



typedef boost::shared_ptr<Callback> Callback_sp;

template <typename T>
struct pointer_less_than {
  const bool operator()(const T a, const T b) const {
    return *a < *b;
  }
};


// Stores R function callbacks, ordered by timestamp.
class CallbackRegistry {
private:
  int id;

  // Most of the behavior of the registry is like a priority queue. However, a
  // std::priority_queue only allows access to the top element, and when we
  // cancel a callback or get an Rcpp::List representation, we need random
  // access, so we'll use a std::set.
  typedef std::set<Callback_sp, pointer_less_than<Callback_sp> > cbSet;
  // This is a priority queue of shared pointers to Callback objects. The
  // reason it is not a priority_queue<Callback> is because that can cause
  // objects to be copied on the wrong thread, and even trigger an R GC event
  // on the wrong thread. https://github.com/r-lib/later/issues/39
  cbSet queue;
  Mutex* mutex;
  ConditionVariable* condvar;

public:
  // The CallbackRegistry must be given a Mutex and ConditionVariable when
  // initialized, because they are shared among the CallbackRegistry objects
  // and the CallbackRegistryTable; they serve as a global lock. Note that the
  // lifetime of these objects must be longer than the CallbackRegistry.
  CallbackRegistry(int id, Mutex* mutex, ConditionVariable* condvar);
  ~CallbackRegistry();

  int getId() const;

  // Add a function to the registry, to be executed at `secs` seconds in
  // the future (i.e. relative to the current time).
  uint64_t add(Rcpp::Function func, double secs);

  // Add a C function to the registry, to be executed at `secs` seconds in
  // the future (i.e. relative to the current time).
  uint64_t add(void (*func)(void*), void* data, double secs);

  bool cancel(uint64_t id);

  // The smallest timestamp present in the registry, if any.
  // Use this to determine the next time we need to pump events.
  Optional<Timestamp> nextTimestamp(bool recursive = true) const;

  // Is the registry completely empty?
  bool empty() const;

  // Is anything ready to execute?
  bool due(const Timestamp& time = Timestamp(), bool recursive = true) const;

  // Pop and return an ordered list of functions to execute now.
  std::vector<Callback_sp> take(size_t max = -1, const Timestamp& time = Timestamp());

  // Wait until the next available callback is ready to execute.
  bool wait(double timeoutSecs, bool recursive) const;

  // Return a List of items in the queue.
  Rcpp::List list() const;

  // References to parent and children registries. These are used for
  // automatically running child loops. They should only be accessed and
  // modified from the main thread.
  boost::shared_ptr<CallbackRegistry> parent;
  std::vector<boost::shared_ptr<CallbackRegistry> > children;
};

#endif // _CALLBACK_REGISTRY_H_