Create New Item
Item Type
File
Folder
Item Name
Search file in folder and subfolders...
Are you sure want to rename?
File Manager
/
vendor
/
symfony
/
clock
:
RelayConnection.php
Advanced Search
Upload
New Item
Settings
Back
Back Up
Advanced Editor
Save
<?php /* * This file is part of the Predis package. * * (c) 2009-2020 Daniele Alessandri * (c) 2021-2023 Till Krüss * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\Connection; use InvalidArgumentException; use Predis\ClientException; use Predis\Command\CommandInterface; use Predis\NotSupportedException; use Predis\Response\ServerException; use Relay\Exception as RelayException; use Relay\Relay; /** * This class provides the implementation of a Predis connection that * uses Relay for network communication and in-memory caching. * * Using Relay allows for: * 1) significantly faster reads thanks to in-memory caching * 2) fast data serialization using igbinary * 3) fast data compression using lzf, lz4 or zstd * * Usage of igbinary serialization and zstd compresses reduces * network traffic and Redis memory usage by ~75%. * * For instructions on how to install the Relay extension, please consult * the repository of the project: https://relay.so/docs/installation * * The connection parameters supported by this class are: * * - scheme: it can be either 'tcp', 'tls' or 'unix'. * - host: hostname or IP address of the server. * - port: TCP port of the server. * - path: path of a UNIX domain socket when scheme is 'unix'. * - timeout: timeout to perform the connection. * - read_write_timeout: timeout of read / write operations. * - cache: whether to use in-memory caching * - serializer: data serializer * - compression: data compression algorithm * * @see https://github.com/cachewerk/relay */ class RelayConnection extends StreamConnection { use RelayMethods; /** * The Relay instance. * * @var \Relay\Relay */ protected $client; /** * These commands must be called on the client, not using `Relay::rawCommand()`. * * @var string[] */ public $atypicalCommands = [ 'AUTH', 'SELECT', 'TYPE', 'MULTI', 'EXEC', 'DISCARD', 'WATCH', 'UNWATCH', 'SUBSCRIBE', 'UNSUBSCRIBE', 'PSUBSCRIBE', 'PUNSUBSCRIBE', 'SSUBSCRIBE', 'SUNSUBSCRIBE', ]; /** * {@inheritdoc} */ public function __construct(ParametersInterface $parameters) { $this->assertExtensions(); $this->parameters = $this->assertParameters($parameters); $this->client = $this->createClient(); } /** * {@inheritdoc} */ public function isConnected() { return $this->client->isConnected(); } /** * {@inheritdoc} */ public function disconnect() { if ($this->client->isConnected()) { $this->client->close(); } } /** * Checks if the Relay extension is loaded in PHP. */ private function assertExtensions() { if (!extension_loaded('relay')) { throw new NotSupportedException( 'The "relay" extension is required by this connection backend.' ); } } /** * {@inheritdoc} */ protected function assertParameters(ParametersInterface $parameters) { if (!in_array($parameters->scheme, ['tcp', 'tls', 'unix', 'redis', 'rediss'])) { throw new InvalidArgumentException("Invalid scheme: '{$parameters->scheme}'."); } if (!in_array($parameters->serializer, [null, 'php', 'igbinary', 'msgpack', 'json'])) { throw new InvalidArgumentException("Invalid serializer: '{$parameters->serializer}'."); } if (!in_array($parameters->compression, [null, 'lzf', 'lz4', 'zstd'])) { throw new InvalidArgumentException("Invalid compression algorithm: '{$parameters->compression}'."); } return $parameters; } /** * Creates a new instance of the client. * * @return \Relay\Relay */ private function createClient() { $client = new Relay(); // throw when errors occur and return `null` for non-existent keys $client->setOption(Relay::OPT_PHPREDIS_COMPATIBILITY, false); // use reply literals $client->setOption(Relay::OPT_REPLY_LITERAL, true); // disable Relay's command/connection retry $client->setOption(Relay::OPT_MAX_RETRIES, 0); // whether to use in-memory caching $client->setOption(Relay::OPT_USE_CACHE, $this->parameters->cache ?? true); // set data serializer $client->setOption(Relay::OPT_SERIALIZER, constant(sprintf( '%s::SERIALIZER_%s', Relay::class, strtoupper($this->parameters->serializer ?? 'none') ))); // set data compression algorithm $client->setOption(Relay::OPT_COMPRESSION, constant(sprintf( '%s::COMPRESSION_%s', Relay::class, strtoupper($this->parameters->compression ?? 'none') ))); return $client; } /** * Returns the underlying client. * * @return \Relay\Relay */ public function getClient() { return $this->client; } /** * {@inheritdoc} */ protected function getIdentifier() { return $this->client->endpointId(); } /** * {@inheritdoc} */ protected function createStreamSocket(ParametersInterface $parameters, $address, $flags) { $timeout = isset($parameters->timeout) ? (float) $parameters->timeout : 5.0; $retry_interval = 0; $read_timeout = 5.0; if (isset($parameters->read_write_timeout)) { $read_timeout = (float) $parameters->read_write_timeout; $read_timeout = $read_timeout > 0 ? $read_timeout : 0; } try { $this->client->connect( $parameters->path ?? $parameters->host, isset($parameters->path) ? 0 : $parameters->port, $timeout, null, $retry_interval, $read_timeout ); } catch (RelayException $ex) { $this->onConnectionError($ex->getMessage(), $ex->getCode()); } return $this->client; } /** * {@inheritdoc} */ public function executeCommand(CommandInterface $command) { if (!$this->client->isConnected()) { $this->getResource(); } try { $name = $command->getId(); // When using compression or a serializer, we'll need a dedicated // handler for `Predis\Command\RawCommand` calls, currently both // parameters are unsupported until a future Relay release return in_array($name, $this->atypicalCommands) ? $this->client->{$name}(...$command->getArguments()) : $this->client->rawCommand($name, ...$command->getArguments()); } catch (RelayException $ex) { throw $this->onCommandError($ex, $command); } } /** * {@inheritdoc} */ public function onCommandError(RelayException $exception, CommandInterface $command) { $code = $exception->getCode(); $message = $exception->getMessage(); if (strpos($message, 'RELAY_ERR_IO')) { return new ConnectionException($this, $message, $code, $exception); } if (strpos($message, 'RELAY_ERR_REDIS')) { return new ServerException($message, $code, $exception); } if (strpos($message, 'RELAY_ERR_WRONGTYPE') && strpos($message, "Got reply-type 'status'")) { $message = 'Operation against a key holding the wrong kind of value'; } return new ClientException($message, $code, $exception); } /** * Applies the configured serializer and compression to given value. * * @param mixed $value * @return string */ public function pack($value) { return $this->client->_pack($value); } /** * Deserializes and decompresses to given value. * * @param mixed $value * @return string */ public function unpack($value) { return $this->client->_unpack($value); } /** * {@inheritdoc} */ public function writeRequest(CommandInterface $command) { throw new NotSupportedException('The "relay" extension does not support writing requests.'); } /** * {@inheritdoc} */ public function readResponse(CommandInterface $command) { throw new NotSupportedException('The "relay" extension does not support reading responses.'); } /** * {@inheritdoc} */ public function __destruct() { $this->disconnect(); } /** * {@inheritdoc} */ public function __wakeup() { $this->assertExtensions(); $this->client = $this->createClient(); } }