File: AMQPIOReader.php

package info (click to toggle)
php-amqplib 3.7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,060 kB
  • sloc: php: 13,145; makefile: 77; sh: 27
file content (122 lines) | stat: -rw-r--r-- 3,258 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
<?php

namespace PhpAmqpLib\Wire;

use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\IO\AbstractIO;
use RuntimeException;

class AMQPIOReader extends AMQPReader
{
    /** @var AbstractIO */
    private $io;

    /** @var int|float|null */
    protected $timeout;

    public function __construct(AbstractIO $io, $timeout = 0)
    {
        $this->io = $io;
        $this->timeout = $timeout;
    }

    public function close(): void
    {
        $this->io->close();
    }

    /**
     * @return float|int|mixed|null
     */
    public function getTimeout()
    {
        return $this->timeout;
    }

    /**
     * Sets the timeout (second)
     *
     * @param int|float|null $timeout
     */
    public function setTimeout($timeout)
    {
        $this->timeout = $timeout;
    }

    /**
     * @param int $n
     * @return string
     * @throws RuntimeException
     * @throws AMQPDataReadException|AMQPNoDataException|AMQPIOException
     */
    protected function rawread(int $n): string
    {
        $res = '';
        while (true) {
            $this->wait();
            try {
                $res = $this->io->read($n);
                break;
            } catch (AMQPTimeoutException $e) {
                if ($this->getTimeout() > 0) {
                    throw $e;
                }
            }
        }
        $this->offset += $n;

        return $res;
    }

    /**
     * Waits until some data is retrieved from the socket.
     *
     * AMQPTimeoutException can be raised if the timeout is set
     *
     * @throws AMQPTimeoutException when timeout is set and no data received
     * @throws AMQPNoDataException when no data is ready to read from IO
     */
    protected function wait(): void
    {
        $timeout = $this->timeout;
        if (null === $timeout) {
            // timeout=null just poll state and return instantly
            $result = $this->io->select(0);
            if ($result === 0) {
                throw new AMQPNoDataException('No data is ready to read');
            }
            return;
        }

        if (!($timeout > 0)) {
            // wait indefinitely for data if timeout=0
            $result = $this->io->select(null);
            if ($result === 0) {
                throw new AMQPNoDataException('No data is ready to read');
            }
            return;
        }

        $leftTime = $timeout;
        $started = microtime(true);
        do {
            [$sec, $usec] = MiscHelper::splitSecondsMicroseconds($leftTime);
            $result = $this->io->select($sec, $usec);
            if ($result > 0) {
                return;
            }
            // select might be interrupted by signal, calculate left time and repeat
            $leftTime = $timeout - (microtime(true) - $started);
        } while ($leftTime > 0);

        throw new AMQPTimeoutException(sprintf(
                                           'The connection timed out after %s sec while awaiting incoming data',
                                           $timeout
                                       ));

    }
}