1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Dominik Picheta
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## Unstable API.
import std/asyncfutures
when defined(nimPreviewSlimSystem):
import std/assertions
import std/deques
type
FutureStream*[T] = ref object ## Special future that acts as
## a queue. Its API is still
## experimental and so is
## subject to change.
queue: Deque[T]
finished: bool
cb: proc () {.closure, gcsafe.}
error*: ref Exception
proc newFutureStream*[T](fromProc = "unspecified"): FutureStream[T] =
## Create a new `FutureStream`. This future's callback is activated when
## two events occur:
##
## * New data is written into the future stream.
## * The future stream is completed (this means that no more data will be
## written).
##
## Specifying `fromProc`, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
##
## **Note:** The API of FutureStream is still new and so has a higher
## likelihood of changing in the future.
result = FutureStream[T](finished: false, cb: nil)
result.queue = initDeque[T]()
proc complete*[T](future: FutureStream[T]) =
## Completes a `FutureStream` signalling the end of data.
assert(future.error == nil, "Trying to complete failed stream")
future.finished = true
if not future.cb.isNil:
future.cb()
proc fail*[T](future: FutureStream[T], error: ref Exception) =
## Completes `future` with `error`.
assert(not future.finished)
future.finished = true
future.error = error
if not future.cb.isNil:
future.cb()
proc `callback=`*[T](future: FutureStream[T],
cb: proc (future: FutureStream[T]) {.closure, gcsafe.}) =
## Sets the callback proc to be called when data was placed inside the
## future stream.
##
## The callback is also called when the future is completed. So you should
## use `finished` to check whether data is available.
##
## If the future stream already has data or is finished then `cb` will be
## called immediately.
proc named() = cb(future)
future.cb = named
if future.queue.len > 0 or future.finished:
callSoon(future.cb)
proc finished*[T](future: FutureStream[T]): bool =
## Check if a `FutureStream` is finished. `true` value means that
## no more data will be placed inside the stream *and* that there is
## no data waiting to be retrieved.
result = future.finished and future.queue.len == 0
proc failed*[T](future: FutureStream[T]): bool =
## Determines whether `future` completed with an error.
return future.error != nil
proc write*[T](future: FutureStream[T], value: T): Future[void] =
## Writes the specified value inside the specified future stream.
##
## This will raise `ValueError` if `future` is finished.
result = newFuture[void]("FutureStream.put")
if future.finished:
let msg = "FutureStream is finished and so no longer accepts new data."
result.fail(newException(ValueError, msg))
return
# TODO: Implement limiting of the streams storage to prevent it growing
# infinitely when no reads are occurring.
future.queue.addLast(value)
if not future.cb.isNil: future.cb()
result.complete()
proc read*[T](future: FutureStream[T]): owned(Future[(bool, T)]) =
## Returns a future that will complete when the `FutureStream` has data
## placed into it. The future will be completed with the oldest
## value stored inside the stream. The return value will also determine
## whether data was retrieved, `false` means that the future stream was
## completed and no data was retrieved.
##
## This function will remove the data that was returned from the underlying
## `FutureStream`.
var resFut = newFuture[(bool, T)]("FutureStream.take")
let savedCb = future.cb
proc newCb(fs: FutureStream[T]) =
# Exit early if `resFut` is already complete. (See #8994).
if resFut.finished: return
# We don't want this callback called again.
#future.cb = nil
# The return value depends on whether the FutureStream has finished.
var res: (bool, T)
if finished(fs):
# Remember, this callback is called when the FutureStream is completed.
res[0] = false
else:
res[0] = true
res[1] = fs.queue.popFirst()
if fs.failed:
resFut.fail(fs.error)
else:
resFut.complete(res)
# If the saved callback isn't nil then let's call it.
if not savedCb.isNil:
if fs.queue.len > 0:
savedCb()
else:
future.cb = savedCb
if future.queue.len > 0 or future.finished:
newCb(future)
else:
future.callback = newCb
return resFut
proc len*[T](future: FutureStream[T]): int =
## Returns the amount of data pieces inside the stream.
future.queue.len
|