vendor/elasticsearch/elasticsearch/src/Elasticsearch/Transport.php line 91

Open in your IDE?
  1. <?php
  2. declare(strict_types 1);
  3. namespace Elasticsearch;
  4. use Elasticsearch\Common\Exceptions;
  5. use Elasticsearch\ConnectionPool\AbstractConnectionPool;
  6. use Elasticsearch\Connections\Connection;
  7. use Elasticsearch\Connections\ConnectionInterface;
  8. use GuzzleHttp\Ring\Future\FutureArrayInterface;
  9. use Psr\Log\LoggerInterface;
  10. /**
  11.  * Class Transport
  12.  *
  13.  * @category Elasticsearch
  14.  * @package  Elasticsearch
  15.  * @author   Zachary Tong <zach@elastic.co>
  16.  * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache2
  17.  * @link     http://elastic.co
  18.  */
  19. class Transport
  20. {
  21.     /**
  22.      * @var AbstractConnectionPool
  23.      */
  24.     public $connectionPool;
  25.     /**
  26.      * @var LoggerInterface
  27.      */
  28.     private $log;
  29.     /**
  30.      * @var int
  31.      */
  32.     public $retryAttempts 0;
  33.     /**
  34.      * @var Connection
  35.      */
  36.     public $lastConnection;
  37.     /**
  38.      * @var int
  39.      */
  40.     public $retries;
  41.     /**
  42.      * Transport class is responsible for dispatching requests to the
  43.      * underlying cluster connections
  44.      *
  45.      * @param int                                   $retries
  46.      * @param bool                                  $sniffOnStart
  47.      * @param ConnectionPool\AbstractConnectionPool $connectionPool
  48.      * @param \Psr\Log\LoggerInterface              $log            Monolog logger object
  49.      */
  50.     public function __construct(int $retriesAbstractConnectionPool $connectionPoolLoggerInterface $logbool $sniffOnStart false)
  51.     {
  52.         $this->log            $log;
  53.         $this->connectionPool $connectionPool;
  54.         $this->retries        $retries;
  55.         if ($sniffOnStart === true) {
  56.             $this->log->notice('Sniff on Start.');
  57.             $this->connectionPool->scheduleCheck();
  58.         }
  59.     }
  60.     /**
  61.      * Returns a single connection from the connection pool
  62.      * Potentially performs a sniffing step before returning
  63.      */
  64.     public function getConnection(): ConnectionInterface
  65.     {
  66.         return $this->connectionPool->nextConnection();
  67.     }
  68.     /**
  69.      * Perform a request to the Cluster
  70.      *
  71.      * @param string $method  HTTP method to use
  72.      * @param string $uri     HTTP URI to send request to
  73.      * @param array  $params  Optional query parameters
  74.      * @param null   $body    Optional query body
  75.      * @param array  $options
  76.      *
  77.      * @throws Common\Exceptions\NoNodesAvailableException|\Exception
  78.      */
  79.     public function performRequest(string $methodstring $uri, array $params null$body null, array $options = []): FutureArrayInterface
  80.     {
  81.         try {
  82.             $connection  $this->getConnection();
  83.         } catch (Exceptions\NoNodesAvailableException $exception) {
  84.             $this->log->critical('No alive nodes found in cluster');
  85.             throw $exception;
  86.         }
  87.         $response             = [];
  88.         $caughtException      null;
  89.         $this->lastConnection $connection;
  90.         $future $connection->performRequest(
  91.             $method,
  92.             $uri,
  93.             $params,
  94.             $body,
  95.             $options,
  96.             $this
  97.         );
  98.         $future->promise()->then(
  99.             //onSuccess
  100.             function ($response) {
  101.                 $this->retryAttempts 0;
  102.                 // Note, this could be a 4xx or 5xx error
  103.             },
  104.             //onFailure
  105.             function ($response) {
  106.                 // Ignore 400 level errors, as that means the server responded just fine
  107.                 if (!(isset($response['code']) && $response['code'] >=400 && $response['code'] < 500)) {
  108.                     // Otherwise schedule a check
  109.                     $this->connectionPool->scheduleCheck();
  110.                 }
  111.             }
  112.         );
  113.         return $future;
  114.     }
  115.     /**
  116.      * @param FutureArrayInterface $result  Response of a request (promise)
  117.      * @param array                $options Options for transport
  118.      *
  119.      * @return callable|array
  120.      */
  121.     public function resultOrFuture(FutureArrayInterface $result, array $options = [])
  122.     {
  123.         $response null;
  124.         $async = isset($options['client']['future']) ? $options['client']['future'] : null;
  125.         if (is_null($async) || $async === false) {
  126.             do {
  127.                 $result $result->wait();
  128.             } while ($result instanceof FutureArrayInterface);
  129.             return $result;
  130.         } elseif ($async === true || $async === 'lazy') {
  131.             return $result;
  132.         }
  133.     }
  134.     public function shouldRetry(array $request): bool
  135.     {
  136.         if ($this->retryAttempts $this->retries) {
  137.             $this->retryAttempts += 1;
  138.             return true;
  139.         }
  140.         return false;
  141.     }
  142.     /**
  143.      * Returns the last used connection so that it may be inspected.  Mainly
  144.      * for debugging/testing purposes.
  145.      */
  146.     public function getLastConnection(): ConnectionInterface
  147.     {
  148.         return $this->lastConnection;
  149.     }
  150. }