1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
package sys.thread;
import eval.luv.Loop;
import eval.luv.Async;
import eval.luv.Timer as LuvTimer;
import haxe.MainLoop;
/**
When an event loop has an available event to execute.
**/
@:coreApi
enum NextEventTime {
/** There's already an event waiting to be executed */
Now;
/** No new events are expected. */
Never;
/**
An event is expected to arrive at any time.
If `time` is specified, then the event will be ready at that time for sure.
*/
AnyTime(time:Null<Float>);
/** An event is expected to be ready for execution at `time`. */
At(time:Float);
}
abstract EventHandler(RegularEvent) from RegularEvent to RegularEvent {}
private class RegularEvent {
public var timer:Null<LuvTimer>;
public var event:()->Void;
public function new(e:()->Void) {
event = e;
}
public function run() {
event();
}
}
/**
An event loop implementation used for `sys.thread.Thread`
**/
@:coreApi
class EventLoop {
@:allow(eval.luv.Loop)
final handle:Loop;
final mutex = new Mutex();
final wakeup:Async;
var promisedEventsCount = 0;
var pending:Array<()->Void> = [];
var started:Bool = false;
var isMainThread:Bool;
static var CREATED : Bool;
public function new():Void {
isMainThread = !CREATED;
CREATED = true;
handle = Loop.init().resolve();
wakeup = Async.init(handle, consumePending).resolve();
wakeup.unref();
}
/**
Schedule event for execution every `intervalMs` milliseconds in current loop.
**/
public function repeat(event:()->Void, intervalMs:Int):EventHandler {
var e = new RegularEvent(event);
mutex.acquire();
e.timer = LuvTimer.init(handle).resolve();
e.timer.start(e.run, intervalMs, intervalMs < 1 ? 1 : intervalMs).resolve();
mutex.release();
wakeup.send();
return e;
}
/**
Prevent execution of a previously scheduled event in current loop.
**/
public function cancel(eventHandler:EventHandler):Void {
mutex.acquire();
(eventHandler:RegularEvent).event = noop;
pending.push(() -> {
var timer = (eventHandler:RegularEvent).timer;
timer.stop().resolve();
timer.close(noop);
});
mutex.release();
wakeup.send();
}
static final noop = function() {}
/**
Notify this loop about an upcoming event.
This makes the thread stay alive and wait for as many events as the number of
times `.promise()` was called. These events should be added via `.runPromised()`.
**/
public function promise():Void {
mutex.acquire();
++promisedEventsCount;
pending.push(refUnref);
mutex.release();
wakeup.send();
}
/**
Execute `event` as soon as possible.
**/
public function run(event:()->Void):Void {
mutex.acquire();
pending.push(event);
mutex.release();
wakeup.send();
}
/**
Add previously promised `event` for execution.
**/
public function runPromised(event:()->Void):Void {
mutex.acquire();
--promisedEventsCount;
pending.push(refUnref);
pending.push(event);
mutex.release();
wakeup.send();
}
function refUnref():Void {
if (promisedEventsCount > 0 || (isMainThread && haxe.MainLoop.hasEvents())) {
wakeup.ref();
} else {
wakeup.unref();
}
}
public function progress():NextEventTime {
if (started) throw "Event loop already started";
if (handle.run(NOWAIT)) {
return AnyTime(null);
} else {
return Never;
}
}
/**
Blocks until a new event is added or `timeout` (in seconds) to expires.
Depending on a target platform this method may also automatically execute arriving
events while waiting. However if any event is executed it will stop waiting.
Returns `true` if more events are expected.
Returns `false` if no more events expected.
Depending on a target platform this method may be non-reentrant. It must
not be called from event callbacks.
**/
public function wait(?timeout:Float):Bool {
if (started) throw "Event loop already started";
if(timeout != null) {
var timer = LuvTimer.init(handle).resolve();
timer.start(() -> {
timer.stop().resolve();
timer.close(() -> {});
}, Std.int(timeout * 1000));
return handle.run(ONCE);
} else {
return handle.run(ONCE);
}
}
/**
Execute all pending events.
Wait and execute as many events as the number of times `promise()` was called.
Runs until all repeating events are cancelled and no more events are expected.
Depending on a target platform this method may be non-reentrant. It must
not be called from event callbacks.
**/
public function loop():Void {
if (started) throw "Event loop already started";
started = true;
consumePending();
handle.run(DEFAULT);
}
function consumePending(?_:Async):Void {
mutex.acquire();
var p = pending;
pending = [];
mutex.release();
for(fn in p) fn();
if (started && isMainThread) {
var next = @:privateAccess MainLoop.tick();
if (haxe.MainLoop.hasEvents()) wakeup.send();
refUnref();
}
}
}
|