1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
|
# PromiseStream
[![Build Status](https://travis-ci.org/reactphp/promise-stream.svg?branch=master)](https://travis-ci.org/reactphp/promise-stream)
The missing link between Promise-land and Stream-land
for [ReactPHP](https://reactphp.org/).
**Table of Contents**
* [Usage](#usage)
* [buffer()](#buffer)
* [first()](#first)
* [all()](#all)
* [unwrapReadable()](#unwrapreadable)
* [unwrapWritable()](#unwrapwritable)
* [Install](#install)
* [License](#license)
## Usage
This lightweight library consists only of a few simple functions.
All functions reside under the `React\Promise\Stream` namespace.
The below examples assume you use an import statement similar to this:
```php
use React\Promise\Stream;
Stream\buffer(…);
```
Alternatively, you can also refer to them with their fully-qualified name:
```php
\React\Promise\Stream\buffer(…);
```
### buffer()
The `buffer(ReadableStreamInterface $stream, int $maxLength = null)` function can be used to create
a `Promise` which resolves with the stream data buffer. With an optional maximum length argument
which defaults to no limit. In case the maximum length is reached before the end the promise will
be rejected with a `\OverflowException`.
```php
$stream = accessSomeJsonStream();
Stream\buffer($stream)->then(function ($contents) {
var_dump(json_decode($contents));
});
```
The promise will resolve with all data chunks concatenated once the stream closes.
The promise will resolve with an empty string if the stream is already closed.
The promise will reject if the stream emits an error.
The promise will reject if it is canceled.
```php
$stream = accessSomeToLargeStream();
Stream\buffer($stream, 1024)->then(function ($contents) {
var_dump(json_decode($contents));
}, function ($error) {
// Reaching here when the stream buffer goes above the max size,
// in this example that is 1024 bytes,
// or when the stream emits an error.
});
```
### first()
The `first(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
function can be used to create a `Promise` which resolves once the given event triggers for the first time.
```php
$stream = accessSomeJsonStream();
Stream\first($stream)->then(function ($chunk) {
echo 'The first chunk arrived: ' . $chunk;
});
```
The promise will resolve with whatever the first event emitted or `null` if the
event does not pass any data.
If you do not pass a custom event name, then it will wait for the first "data"
event and resolve with a string containing the first data chunk.
The promise will reject if the stream emits an error – unless you're waiting for
the "error" event, in which case it will resolve.
The promise will reject once the stream closes – unless you're waiting for the
"close" event, in which case it will resolve.
The promise will reject if the stream is already closed.
The promise will reject if it is canceled.
### all()
The `all(ReadableStreamInterface|WritableStreamInterface $stream, $event = 'data')`
function can be used to create a `Promise` which resolves with an array of all the event data.
```php
$stream = accessSomeJsonStream();
Stream\all($stream)->then(function ($chunks) {
echo 'The stream consists of ' . count($chunks) . ' chunk(s)';
});
```
The promise will resolve with an array of whatever all events emitted or `null` if the
events do not pass any data.
If you do not pass a custom event name, then it will wait for all the "data"
events and resolve with an array containing all the data chunks.
The promise will resolve with an array once the stream closes.
The promise will resolve with an empty array if the stream is already closed.
The promise will reject if the stream emits an error.
The promise will reject if it is canceled.
### unwrapReadable()
The `unwrapReadable(PromiseInterface $promise)` function can be used to unwrap
a `Promise` which resolves with a `ReadableStreamInterface`.
This function returns a readable stream instance (implementing `ReadableStreamInterface`)
right away which acts as a proxy for the future promise resolution.
Once the given Promise resolves with a `ReadableStreamInterface`, its data will
be piped to the output stream.
```php
//$promise = someFunctionWhichResolvesWithAStream();
$promise = startDownloadStream($uri);
$stream = Stream\unwrapReadable($promise);
$stream->on('data', function ($data) {
echo $data;
});
$stream->on('end', function () {
echo 'DONE';
});
```
If the given promise is either rejected or fulfilled with anything but an
instance of `ReadableStreamInterface`, then the output stream will emit
an `error` event and close:
```php
$promise = startDownloadStream($invalidUri);
$stream = Stream\unwrapReadable($promise);
$stream->on('error', function (Exception $error) {
echo 'Error: ' . $error->getMessage();
});
```
The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
at the time of invoking this function.
If the given promise is already settled and does not resolve with an
instance of `ReadableStreamInterface`, then you will not be able to receive
the `error` event.
You can `close()` the resulting stream at any time, which will either try to
`cancel()` the pending promise or try to `close()` the underlying stream.
```php
$promise = startDownloadStream($uri);
$stream = Stream\unwrapReadable($promise);
$loop->addTimer(2.0, function () use ($stream) {
$stream->close();
});
```
### unwrapWritable()
The `unwrapWritable(PromiseInterface $promise)` function can be used to unwrap
a `Promise` which resolves with a `WritableStreamInterface`.
This function returns a writable stream instance (implementing `WritableStreamInterface`)
right away which acts as a proxy for the future promise resolution.
Once the given Promise resolves with a `WritableStreamInterface`, any data you
wrote to the proxy will be piped to the inner stream.
```php
//$promise = someFunctionWhichResolvesWithAStream();
$promise = startUploadStream($uri);
$stream = Stream\unwrapWritable($promise);
$stream->write('hello');
$stream->end('world');
$stream->on('close', function () {
echo 'DONE';
});
```
If the given promise is either rejected or fulfilled with anything but an
instance of `WritableStreamInterface`, then the output stream will emit
an `error` event and close:
```php
$promise = startUploadStream($invalidUri);
$stream = Stream\unwrapWritable($promise);
$stream->on('error', function (Exception $error) {
echo 'Error: ' . $error->getMessage();
});
```
The given `$promise` SHOULD be pending, i.e. it SHOULD NOT be fulfilled or rejected
at the time of invoking this function.
If the given promise is already settled and does not resolve with an
instance of `WritableStreamInterface`, then you will not be able to receive
the `error` event.
You can `close()` the resulting stream at any time, which will either try to
`cancel()` the pending promise or try to `close()` the underlying stream.
```php
$promise = startUploadStream($uri);
$stream = Stream\unwrapWritable($promise);
$loop->addTimer(2.0, function () use ($stream) {
$stream->close();
});
```
## Install
The recommended way to install this library is [through Composer](https://getcomposer.org).
[New to Composer?](https://getcomposer.org/doc/00-intro.md)
This project follows [SemVer](http://semver.org/).
This will install the latest supported version:
```bash
$ composer require react/promise-stream:^1.1.1
```
See also the [CHANGELOG](CHANGELOG.md) for details about version upgrades.
This project aims to run on any platform and thus does not require any PHP
extensions and supports running on legacy PHP 5.3 through current PHP 7+ and
HHVM.
It's *highly recommended to use PHP 7+* for this project.
## License
MIT, see [LICENSE file](LICENSE).
|