1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
Pheanstalk
==========
[](https://packagist.org/packages/pda/pheanstalk)
[](https://packagist.org/pda/pheanstalk)
[](https://scrutinizer-ci.com/g/pheanstalk/pheanstalk/?branch=master)
[](https://scrutinizer-ci.com/g/pheanstalk/pheanstalk/?branch=master)
[](https://travis-ci.org/pheanstalk/pheanstalk)
Pheanstalk is a pure PHP 7.1+ client for the [beanstalkd workqueue][1]. It has
been actively developed, and used in production by many, since late 2008.
Created by [Paul Annesley][2], Pheanstalk is rigorously unit tested and written
using encapsulated, maintainable object oriented design. Community feedback,
bug reports and patches has led to a stable 1.0 release in 2010, a 2.0 release
in 2013, and a 3.0 release in 2014.
Pheanstalk 3.0 introduces PHP namespaces, PSR-1 and PSR-2 coding standards,
and PSR-4 autoloader standard.
beanstalkd up to the latest version 1.10 is supported. All commands and
responses specified in the [protocol documentation][4] for beanstalkd 1.3 are
implemented.
[1]: https://beanstalkd.github.io/
[2]: https://paul.annesley.cc/
[3]: https://github.com/sammousa
[4]: https://github.com/kr/beanstalkd/tree/v1.3/doc/protocol.txt?raw=true
Pheanstalk 4
-------------
In 2018 [Sam Mousa][3] took on the responsibility of maintaining Pheanstalk.
Pheanstalk 4.0 drops support for older PHP versions. It contains the following changes (among other things):
- Strict PHP type hinting
- Value objects for Job IDs
- Functions without side effects
- Dropped support for persistent connections
- Add support for multiple socket implementations (streams extension, socket extension, fsockopen)
### Dropping support persistent connections
Persistent connections are a feature where a TCP connection is kept alive between different requests to reduce overhead
from TCP connection set up. When reusing TCP connections we must always guarantee that the application protocol, in this
case beanstalks' protocol is in a proper state. This is hard, and in some cases impossible; at the very least this means
we must do some tests which cause roundtrips.
Consider for example a connection that has just sent the command `PUT 0 4000`. The beanstalk server is now going to read
4000 bytes, but if the PHP script crashes during this write the next request get assigned this TCP socket.
Now to reset the connection to a known state it used to subscribe to the default tube: `use default`.
Since the beanstalk server is expecting 4000 bytes, it will just write this command to the job and wait for more bytes..
To prevent these kinds of issues the simplest solution is to not use persistent connections.
### Dropped connection handling
Depending on the socket implementation used we might not be able to enable TCP keepalive. If we do not have TCP keepalive
there is no way for us to detect dropped connections, the underlying OS may wait up to 15 minutes to decide that a TCP
connection where no packets are being sent is disconnected.
When using a socket implementation that supports read timeouts, like `SocketSocket` which uses the socket extension we
use read and write timeouts to detect broken connections; the issue with the beanstalk protocol is that it allows for
no packets to be sent for extended periods of time. Solutions are to either catch these connection exceptions and reconnect
or use `reserveWithTimeout()` with a timeout that is less than the read / write timeouts.
Example code for a job runner could look like this (this is real production code):
```php
while(true) {
$job = $beanstalk->reserveWithTimeout(50);
$this->stdout('.', Console::FG_CYAN);
if (isset($job)) {
$this->ensureDatabase($db);
try {
/** @var HookTask $task */
$task = $taskFactory->createFromJson($job->getData());
$commandBus->handle($task);
$this->stdout("Deleting job: {$job->getId()}\n", Console::FG_GREEN);
$beanstalk->delete($job);
} catch (\Throwable $t) {
\Yii::error($t);
$this->stderr("\n{$t->getMessage()}\n", Console::FG_RED);
$this->stderr("{$t->getTraceAsString()}\n", Console::FG_RED);
$this->stdout("Burying job: {$job->getId()}\n", Console::FG_YELLOW);
$beanstalk->bury($job);
}
}
}
```
Here connection errors will cause the process to exit (and be restarted by a task manager).
### Functions with side effects
In version 4 functions with side effects have been removed, functions like `putInTube` internally did several things:
1. Switch to the tube
2. Put the job in the new tube
In this example, the tube changes meaning that the connection is now in a different state. This is not intuitive and forces
any user of the connection to always switch / check the current tube.
Another issue with this approach is that it is harder to deal with errors. If an exception occurs it is unclear whether
we did or did not switch tube.
Migration to v4
-------------
A migration should in most cases be relatively simple:
- Change the constructor, either use the static constructor, use a DI container to construct the dependencies, or manually
instantiate them.
- Change instances of `reserve()` with a timeout to `reserveWithTimeout(int $timeout)` since `reserve()` no longer accepts a `timeout` parameter.
- Run your tests, or use a static analyzer to test for calls to functions that no longer exist.
- Make sure that you handle connection exceptions (this is not new to V4, only in V4 you will get more of them due to the
default usage of a socket implementation that has read / write timeouts).
Installation with Composer
-------------
Install pheanstalk as a dependency with composer:
```bash
composer require pda/pheanstalk
```
Usage Example
-------------
#### Producer
```php
<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('127.0.0.1');
// Queue a Job
$pheanstalk
->useTube('testtube')
->put("job payload goes here\n");
$pheanstalk
->useTube('testtube')
->put(
json_encode(['test' => 'data']), // encode data in payload
Pheanstalk::DEFAULT_PRIORITY, // default priority
30, // delay by 30s
60 // beanstalk will retry job after 60s
);
```
#### Consumer / Worker
```php
<?php
require __DIR__ . '/vendor/autoload.php';
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('127.0.0.1');
// we want jobs from 'testtube' only.
$pheanstalk->watch('testtube');
// this hangs until a Job is produced.
$job = $pheanstalk->reserve();
try {
$jobPayload = $job->getData();
// do work.
sleep(2);
// If it's going to take a long time, periodically
// tell beanstalk we're alive to stop it rescheduling the job.
$pheanstalk->touch($job);
sleep(2);
// eventually we're done, delete job.
$pheanstalk->delete($job);
}
catch(\Exception $e) {
// handle exception.
// and let some other worker retry.
$pheanstalk->release($job);
}
```
Running the tests
-----------------
If you have docker-compose installed running tests is as simple as:
```sh
> composer test
```
If you don't then you manually need to set up a beanstalk server and run:
```sh
> vendor/bin/phpunit
```
Contributors
------------
* [Paul Annesley](https://github.com/pda)
* [Lachlan Donald](https://github.com/lox)
* [Joakim Bick](https://github.com/minimoe)
* [Vyacheslav](https://github.com/SlNPacifist)
* [leprechaun](https://github.com/leprechaun)
* [Peter McArthur](https://github.com/ptrmcrthr)
* [robbiehudson](https://github.com/robbiehudson)
* [Geoff Catlin](https://github.com/gcatlin)
* [Steven Lewis](https://github.com/srjlewis)
* [Lars Yencken](https://github.com/larsyencken)
* [Josh Butts](https://github.com/jimbojsb)
* [Henry Smith](https://github.com/h2s)
* [Javier Spagnoletti](https://github.com/phansys)
* [Graham Campbell](https://github.com/GrahamCampbell)
* [Thomas Tourlourat](https://github.com/armetiz)
* [Matthieu Napoli](https://github.com/mnapoli)
* [Christoph](https://github.com/xrstf)
* [James Hamilton](https://github.com/mrjameshamilton)
* [Hannes Van De Vreken](https://github.com/hannesvdvreken)
* [Yaniv Davidovitch](https://github.com/YanivD)
* [Sam Mousa](https://github.com/sammousa)
* .. [more?](https://github.com/pda/pheanstalk/contributors) Let me know if you're missing.
License
-------
© Paul Annesley
Released under the [The MIT License](http://www.opensource.org/licenses/mit-license.php)
|