1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
<?php
declare(strict_types=1);
namespace Pheanstalk;
use Pheanstalk\Contract\CommandInterface;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkManagerInterface;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\JobState;
use Pheanstalk\Values\JobStats;
use Pheanstalk\Values\RawResponse;
use Pheanstalk\Values\ServerStats;
use Pheanstalk\Values\Success;
use Pheanstalk\Values\TubeList;
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;
/**
* This class implements "management" functions for the beanstalk protocol.
*
*/
final class PheanstalkManager implements PheanstalkManagerInterface
{
use StaticFactoryTrait;
public function kick(int $max): int
{
$command = new Command\KickCommand($max);
return $command->interpret($this->dispatch($command));
}
public function kickJob(JobIdInterface $job): void
{
$command = new Command\KickJobCommand($job);
$command->interpret($this->dispatch($command));
}
private function dispatch(CommandInterface $command): RawResponse
{
return $this->connection->dispatchCommand($command);
}
public function listTubes(): TubeList
{
$command = new Command\ListTubesCommand();
return $command->interpret($this->dispatch($command));
}
public function pauseTube(TubeName $tube, int $delay): void
{
$command = new Command\PauseTubeCommand($tube, $delay);
$command->interpret($this->dispatch($command));
}
public function resumeTube(TubeName $tube): void
{
// Pause a tube with zero delay will resume the tube
$this->pauseTube($tube, 0);
}
public function peek(JobIdInterface $job): Job
{
$command = new Command\PeekJobCommand($job);
return $command->interpret($this->dispatch($command));
}
public function peekReady(): null|Job
{
return $this->peekState(JobState::READY);
}
public function peekDelayed(): null|Job
{
return $this->peekState(JobState::DELAYED);
}
private function peekState(JobState $state): null|Job
{
$command = new Command\PeekCommand($state);
$response = $command->interpret($this->dispatch($command));
return $response instanceof Success ? null : $response;
}
public function peekBuried(): null|Job
{
return $this->peekState(JobState::BURIED);
}
public function statsJob(JobIdInterface $job): JobStats
{
$command = new Command\StatsJobCommand($job);
return $command->interpret($this->dispatch($command));
}
public function statsTube(TubeName $tube): TubeStats
{
$command = new Command\StatsTubeCommand($tube);
return $command->interpret($this->dispatch($command));
}
public function stats(): ServerStats
{
$command = new Command\StatsCommand();
return $command->interpret($this->dispatch($command));
}
public function disconnect(): void
{
$this->connection->disconnect();
}
}
|