1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
|
<?php
declare(strict_types=1);
namespace Pheanstalk;
use Pheanstalk\Command\BuryCommand;
use Pheanstalk\Command\ReserveJobCommand;
use Pheanstalk\Contract\CommandInterface;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkPublisherInterface;
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\RawResponse;
use Pheanstalk\Values\Success;
use Pheanstalk\Values\Timeout;
use Pheanstalk\Values\TubeList;
use Pheanstalk\Values\TubeName;
final class PheanstalkSubscriber implements PheanstalkSubscriberInterface
{
use StaticFactoryTrait;
private function dispatch(CommandInterface $command, ?Timeout $timeout = null): RawResponse
{
return $this->connection->dispatchCommand($command, $timeout);
}
public function delete(JobIdInterface $job): void
{
$command = new Command\DeleteCommand($job);
$command->interpret($this->dispatch($command));
}
public function ignore(TubeName $tube): int
{
$command = new Command\IgnoreCommand($tube);
return $command->interpret($this->dispatch($command));
}
public function listTubesWatched(): TubeList
{
$command = new Command\ListTubesWatchedCommand();
$response = $this->dispatch($command);
return $command->interpret($response);
}
public function release(
JobIdInterface $job,
int $priority = PheanstalkPublisherInterface::DEFAULT_PRIORITY,
int $delay = PheanstalkPublisherInterface::DEFAULT_DELAY
): void {
$command = new Command\ReleaseCommand($job, $priority, $delay);
$command->interpret($this->dispatch($command));
}
public function reserve(): Job
{
$command = new Command\ReserveCommand();
return $command->interpret($this->dispatch($command));
}
/**
* @param int<0, max> $timeout
* @throws Exception\UnsupportedResponseException|Exception\MalformedResponseException|Exception\DeadlineSoonException
*/
public function reserveWithTimeout(int $timeout): null|Job
{
$command = new Command\ReserveWithTimeoutCommand($timeout);
$response = $command->interpret($this->dispatch($command, new Timeout($timeout)));
if ($response instanceof Success) {
return null;
}
return $response;
}
public function touch(JobIdInterface $job): void
{
$command = new Command\TouchCommand($job);
$command->interpret($this->dispatch($command));
}
public function watch(TubeName $tube): int
{
$command = new Command\WatchCommand($tube);
return $command->interpret($this->dispatch($command));
}
public function bury(JobIdInterface $job, int $priority = PheanstalkPublisherInterface::DEFAULT_PRIORITY): void
{
$command = new BuryCommand($job, $priority);
$command->interpret($this->connection->dispatchCommand($command));
}
public function reserveJob(JobIdInterface $job): Job
{
$command = new ReserveJobCommand($job);
return $command->interpret($this->dispatch($command));
}
public function disconnect(): void
{
$this->connection->disconnect();
}
}
|