1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
|
<?php
namespace PhpAmqpLib\Connection;
use LogicException;
/**
* @since 3.2.0
*/
class AMQPConnectionFactory
{
public static function create(AMQPConnectionConfig $config): AbstractConnection
{
if ($config->getIoType() === AMQPConnectionConfig::IO_TYPE_STREAM) {
$connection = new AMQPStreamConnection(
$config->getHost(),
$config->getPort(),
$config->getUser(),
$config->getPassword(),
$config->getVhost(),
$config->isInsist(),
$config->getLoginMethod(),
$config->getLoginResponse(),
$config->getLocale(),
$config->getConnectionTimeout(),
self::getReadWriteTimeout($config),
self::getStreamContext($config),
$config->isKeepalive(),
$config->getHeartbeat(),
$config->getChannelRPCTimeout(),
$config
);
} else {
if ($config->isSecure()) {
throw new LogicException('The socket connection implementation does not support secure connections.');
}
$connection = new AMQPSocketConnection(
$config->getHost(),
$config->getPort(),
$config->getUser(),
$config->getPassword(),
$config->getVhost(),
$config->isInsist(),
$config->getLoginMethod(),
$config->getLoginResponse(),
$config->getLocale(),
$config->getReadTimeout(),
$config->isKeepalive(),
$config->getWriteTimeout(),
$config->getHeartbeat(),
$config->getChannelRPCTimeout(),
$config
);
}
return $connection;
}
private static function getReadWriteTimeout(AMQPConnectionConfig $config): float
{
return min($config->getReadTimeout(), $config->getWriteTimeout());
}
/**
* @param AMQPConnectionConfig $config
* @return string[]
*/
private static function getSslOptions(AMQPConnectionConfig $config): array
{
return array_filter([
'cafile' => $config->getSslCaCert(),
'capath' => $config->getSslCaPath(),
'local_cert' => $config->getSslCert(),
'local_pk' => $config->getSslKey(),
'verify_peer' => $config->getSslVerify(),
'verify_peer_name' => $config->getSslVerifyName(),
'passphrase' => $config->getSslPassPhrase(),
'ciphers' => $config->getSslCiphers(),
'security_level' => $config->getSslSecurityLevel(),
'crypto_method' => $config->getSslCryptoMethod(),
], static function ($value) {
return null !== $value;
});
}
/**
* @param AMQPConnectionConfig $config
* @return resource|null
*/
private static function getStreamContext(AMQPConnectionConfig $config)
{
$context = $config->getStreamContext();
if ($config->isSecure()) {
if (!$context) {
$context = stream_context_create();
}
$options = self::getSslOptions($config);
foreach ($options as $k => $v) {
// Note: 'ssl' applies to 'tls' as well
// https://www.php.net/manual/en/context.ssl.php
stream_context_set_option($context, 'ssl', $k, $v);
}
}
return $context;
}
}
|