File: AMQPConnectionFactory.php

package info (click to toggle)
php-amqplib 3.7.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,060 kB
  • sloc: php: 13,145; makefile: 77; sh: 27
file content (109 lines) | stat: -rw-r--r-- 3,599 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php

namespace PhpAmqpLib\Connection;

use LogicException;

/**
 * @since 3.2.0
 */
class AMQPConnectionFactory
{
    public static function create(AMQPConnectionConfig $config): AbstractConnection
    {
        if ($config->getIoType() === AMQPConnectionConfig::IO_TYPE_STREAM) {
            $connection = new AMQPStreamConnection(
                $config->getHost(),
                $config->getPort(),
                $config->getUser(),
                $config->getPassword(),
                $config->getVhost(),
                $config->isInsist(),
                $config->getLoginMethod(),
                $config->getLoginResponse(),
                $config->getLocale(),
                $config->getConnectionTimeout(),
                self::getReadWriteTimeout($config),
                self::getStreamContext($config),
                $config->isKeepalive(),
                $config->getHeartbeat(),
                $config->getChannelRPCTimeout(),
                $config
            );
        } else {
            if ($config->isSecure()) {
                throw new LogicException('The socket connection implementation does not support secure connections.');
            }

            $connection = new AMQPSocketConnection(
                $config->getHost(),
                $config->getPort(),
                $config->getUser(),
                $config->getPassword(),
                $config->getVhost(),
                $config->isInsist(),
                $config->getLoginMethod(),
                $config->getLoginResponse(),
                $config->getLocale(),
                $config->getReadTimeout(),
                $config->isKeepalive(),
                $config->getWriteTimeout(),
                $config->getHeartbeat(),
                $config->getChannelRPCTimeout(),
                $config
            );
        }

        return $connection;
    }

    private static function getReadWriteTimeout(AMQPConnectionConfig $config): float
    {
        return min($config->getReadTimeout(), $config->getWriteTimeout());
    }

    /**
     * @param AMQPConnectionConfig $config
     * @return string[]
     */
    private static function getSslOptions(AMQPConnectionConfig $config): array
    {
        return array_filter([
            'cafile' => $config->getSslCaCert(),
            'capath' => $config->getSslCaPath(),
            'local_cert' => $config->getSslCert(),
            'local_pk' => $config->getSslKey(),
            'verify_peer' => $config->getSslVerify(),
            'verify_peer_name' => $config->getSslVerifyName(),
            'passphrase' => $config->getSslPassPhrase(),
            'ciphers' => $config->getSslCiphers(),
            'security_level' => $config->getSslSecurityLevel(),
            'crypto_method' => $config->getSslCryptoMethod(),
        ], static function ($value) {
            return null !== $value;
        });
    }

    /**
     * @param AMQPConnectionConfig $config
     * @return resource|null
     */
    private static function getStreamContext(AMQPConnectionConfig $config)
    {
        $context = $config->getStreamContext();

        if ($config->isSecure()) {
            if (!$context) {
                $context = stream_context_create();
            }
            $options = self::getSslOptions($config);
            foreach ($options as $k => $v) {
                // Note: 'ssl' applies to 'tls' as well
                // https://www.php.net/manual/en/context.ssl.php
                stream_context_set_option($context, 'ssl', $k, $v);
            }
        }

        return $context;
    }
}