File "MasterSlaveReplication.php"
Full Path: /home/pulsehostuk9/public_html/invoicer.pulsehost.co.uk/vendor/predis/predis/src/Connection/Replication/MasterSlaveReplication.php
File size: 14.4 KB
MIME-type: text/x-php
Charset: utf-8
<?php
/*
* This file is part of the Predis package.
*
* (c) 2009-2020 Daniele Alessandri
* (c) 2021-2023 Till Krüss
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\Connection\Replication;
use InvalidArgumentException;
use Predis\ClientException;
use Predis\Command\CommandInterface;
use Predis\Command\RawCommand;
use Predis\Connection\ConnectionException;
use Predis\Connection\FactoryInterface;
use Predis\Connection\NodeConnectionInterface;
use Predis\Replication\MissingMasterException;
use Predis\Replication\ReplicationStrategy;
use Predis\Response\ErrorInterface as ResponseErrorInterface;
/**
* Aggregate connection handling replication of Redis nodes configured in a
* single master / multiple slaves setup.
*/
class MasterSlaveReplication implements ReplicationInterface
{
/**
* @var ReplicationStrategy
*/
protected $strategy;
/**
* @var NodeConnectionInterface
*/
protected $master;
/**
* @var NodeConnectionInterface[]
*/
protected $slaves = [];
/**
* @var NodeConnectionInterface[]
*/
protected $pool = [];
/**
* @var NodeConnectionInterface[]
*/
protected $aliases = [];
/**
* @var NodeConnectionInterface
*/
protected $current;
/**
* @var bool
*/
protected $autoDiscovery = false;
/**
* @var FactoryInterface
*/
protected $connectionFactory;
/**
* {@inheritdoc}
*/
public function __construct(ReplicationStrategy $strategy = null)
{
$this->strategy = $strategy ?: new ReplicationStrategy();
}
/**
* Configures the automatic discovery of the replication configuration on failure.
*
* @param bool $value Enable or disable auto discovery.
*/
public function setAutoDiscovery($value)
{
if (!$this->connectionFactory) {
throw new ClientException('Automatic discovery requires a connection factory');
}
$this->autoDiscovery = (bool) $value;
}
/**
* Sets the connection factory used to create the connections by the auto
* discovery procedure.
*
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
public function setConnectionFactory(FactoryInterface $connectionFactory)
{
$this->connectionFactory = $connectionFactory;
}
/**
* Resets the connection state.
*/
protected function reset()
{
$this->current = null;
}
/**
* {@inheritdoc}
*/
public function add(NodeConnectionInterface $connection)
{
$parameters = $connection->getParameters();
if ('master' === $parameters->role) {
$this->master = $connection;
} else {
// everything else is considered a slvave.
$this->slaves[] = $connection;
}
if (isset($parameters->alias)) {
$this->aliases[$parameters->alias] = $connection;
}
$this->pool[(string) $connection] = $connection;
$this->reset();
}
/**
* {@inheritdoc}
*/
public function remove(NodeConnectionInterface $connection)
{
if ($connection === $this->master) {
$this->master = null;
} elseif (false !== $id = array_search($connection, $this->slaves, true)) {
unset($this->slaves[$id]);
} else {
return false;
}
unset($this->pool[(string) $connection]);
if ($this->aliases && $alias = $connection->getParameters()->alias) {
unset($this->aliases[$alias]);
}
$this->reset();
return true;
}
/**
* {@inheritdoc}
*/
public function getConnectionByCommand(CommandInterface $command)
{
if (!$this->current) {
if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
$this->current = $slave;
} else {
$this->current = $this->getMasterOrDie();
}
return $this->current;
}
if ($this->current === $master = $this->getMasterOrDie()) {
return $master;
}
if (!$this->strategy->isReadOperation($command) || !$this->slaves) {
$this->current = $master;
}
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getConnectionById($id)
{
return $this->pool[$id] ?? null;
}
/**
* Returns a connection instance by its alias.
*
* @param string $alias Connection alias.
*
* @return NodeConnectionInterface|null
*/
public function getConnectionByAlias($alias)
{
return $this->aliases[$alias] ?? null;
}
/**
* Returns a connection by its role.
*
* @param string $role Connection role (`master` or `slave`)
*
* @return NodeConnectionInterface|null
*/
public function getConnectionByRole($role)
{
if ($role === 'master') {
return $this->getMaster();
} elseif ($role === 'slave') {
return $this->pickSlave();
}
return null;
}
/**
* Switches the internal connection in use by the backend.
*
* @param NodeConnectionInterface $connection Connection instance in the pool.
*/
public function switchTo(NodeConnectionInterface $connection)
{
if ($connection && $connection === $this->current) {
return;
}
if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
throw new InvalidArgumentException('Invalid connection or connection not found.');
}
$this->current = $connection;
}
/**
* {@inheritdoc}
*/
public function switchToMaster()
{
if (!$connection = $this->getConnectionByRole('master')) {
throw new InvalidArgumentException('Invalid connection or connection not found.');
}
$this->switchTo($connection);
}
/**
* {@inheritdoc}
*/
public function switchToSlave()
{
if (!$connection = $this->getConnectionByRole('slave')) {
throw new InvalidArgumentException('Invalid connection or connection not found.');
}
$this->switchTo($connection);
}
/**
* {@inheritdoc}
*/
public function getCurrent()
{
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getMaster()
{
return $this->master;
}
/**
* Returns the connection associated to the master server.
*
* @return NodeConnectionInterface
*/
private function getMasterOrDie()
{
if (!$connection = $this->getMaster()) {
throw new MissingMasterException('No master server available for replication');
}
return $connection;
}
/**
* {@inheritdoc}
*/
public function getSlaves()
{
return $this->slaves;
}
/**
* Returns the underlying replication strategy.
*
* @return ReplicationStrategy
*/
public function getReplicationStrategy()
{
return $this->strategy;
}
/**
* Returns a random slave.
*
* @return NodeConnectionInterface|null
*/
protected function pickSlave()
{
if (!$this->slaves) {
return null;
}
return $this->slaves[array_rand($this->slaves)];
}
/**
* {@inheritdoc}
*/
public function isConnected()
{
return $this->current ? $this->current->isConnected() : false;
}
/**
* {@inheritdoc}
*/
public function connect()
{
if (!$this->current) {
if (!$this->current = $this->pickSlave()) {
if (!$this->current = $this->getMaster()) {
throw new ClientException('No available connection for replication');
}
}
}
$this->current->connect();
}
/**
* {@inheritdoc}
*/
public function disconnect()
{
foreach ($this->pool as $connection) {
$connection->disconnect();
}
}
/**
* Handles response from INFO.
*
* @param string $response
*
* @return array
*/
private function handleInfoResponse($response)
{
$info = [];
foreach (preg_split('/\r?\n/', $response) as $row) {
if (strpos($row, ':') === false) {
continue;
}
[$k, $v] = explode(':', $row, 2);
$info[$k] = $v;
}
return $info;
}
/**
* Fetches the replication configuration from one of the servers.
*/
public function discover()
{
if (!$this->connectionFactory) {
throw new ClientException('Discovery requires a connection factory');
}
while (true) {
try {
if ($connection = $this->getMaster()) {
$this->discoverFromMaster($connection, $this->connectionFactory);
break;
} elseif ($connection = $this->pickSlave()) {
$this->discoverFromSlave($connection, $this->connectionFactory);
break;
} else {
throw new ClientException('No connection available for discovery');
}
} catch (ConnectionException $exception) {
$this->remove($connection);
}
}
}
/**
* Discovers the replication configuration by contacting the master node.
*
* @param NodeConnectionInterface $connection Connection to the master node.
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
protected function discoverFromMaster(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
{
$response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
$replication = $this->handleInfoResponse($response);
if ($replication['role'] !== 'master') {
throw new ClientException("Role mismatch (expected master, got slave) [$connection]");
}
$this->slaves = [];
foreach ($replication as $k => $v) {
$parameters = null;
if (strpos($k, 'slave') === 0 && preg_match('/ip=(?P<host>.*),port=(?P<port>\d+)/', $v, $parameters)) {
$slaveConnection = $connectionFactory->create([
'host' => $parameters['host'],
'port' => $parameters['port'],
'role' => 'slave',
]);
$this->add($slaveConnection);
}
}
}
/**
* Discovers the replication configuration by contacting one of the slaves.
*
* @param NodeConnectionInterface $connection Connection to one of the slaves.
* @param FactoryInterface $connectionFactory Connection factory instance.
*/
protected function discoverFromSlave(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
{
$response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
$replication = $this->handleInfoResponse($response);
if ($replication['role'] !== 'slave') {
throw new ClientException("Role mismatch (expected slave, got master) [$connection]");
}
$masterConnection = $connectionFactory->create([
'host' => $replication['master_host'],
'port' => $replication['master_port'],
'role' => 'master',
]);
$this->add($masterConnection);
$this->discoverFromMaster($masterConnection, $connectionFactory);
}
/**
* Retries the execution of a command upon slave failure.
*
* @param CommandInterface $command Command instance.
* @param string $method Actual method.
*
* @return mixed
*/
private function retryCommandOnFailure(CommandInterface $command, $method)
{
while (true) {
try {
$connection = $this->getConnectionByCommand($command);
$response = $connection->$method($command);
if ($response instanceof ResponseErrorInterface && $response->getErrorType() === 'LOADING') {
throw new ConnectionException($connection, "Redis is loading the dataset in memory [$connection]");
}
break;
} catch (ConnectionException $exception) {
$connection = $exception->getConnection();
$connection->disconnect();
if ($connection === $this->master && !$this->autoDiscovery) {
// Throw immediately when master connection is failing, even
// when the command represents a read-only operation, unless
// automatic discovery has been enabled.
throw $exception;
} else {
// Otherwise remove the failing slave and attempt to execute
// the command again on one of the remaining slaves...
$this->remove($connection);
}
// ... that is, unless we have no more connections to use.
if (!$this->slaves && !$this->master) {
throw $exception;
} elseif ($this->autoDiscovery) {
$this->discover();
}
} catch (MissingMasterException $exception) {
if ($this->autoDiscovery) {
$this->discover();
} else {
throw $exception;
}
}
}
return $response;
}
/**
* {@inheritdoc}
*/
public function writeRequest(CommandInterface $command)
{
$this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function readResponse(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function executeCommand(CommandInterface $command)
{
return $this->retryCommandOnFailure($command, __FUNCTION__);
}
/**
* {@inheritdoc}
*/
public function __sleep()
{
return ['master', 'slaves', 'pool', 'aliases', 'strategy'];
}
}