vendor/elasticsearch/elasticsearch/src/Elasticsearch/Connections/Connection.php line 243

Open in your IDE?
  1. <?php
  2. declare(strict_types 1);
  3. namespace Elasticsearch\Connections;
  4. use Elasticsearch\Client;
  5. use Elasticsearch\Common\Exceptions\AlreadyExpiredException;
  6. use Elasticsearch\Common\Exceptions\BadRequest400Exception;
  7. use Elasticsearch\Common\Exceptions\Conflict409Exception;
  8. use Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost;
  9. use Elasticsearch\Common\Exceptions\Curl\CouldNotResolveHostException;
  10. use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
  11. use Elasticsearch\Common\Exceptions\ElasticsearchException;
  12. use Elasticsearch\Common\Exceptions\Forbidden403Exception;
  13. use Elasticsearch\Common\Exceptions\MaxRetriesException;
  14. use Elasticsearch\Common\Exceptions\Missing404Exception;
  15. use Elasticsearch\Common\Exceptions\NoDocumentsToGetException;
  16. use Elasticsearch\Common\Exceptions\NoShardAvailableException;
  17. use Elasticsearch\Common\Exceptions\RequestTimeout408Exception;
  18. use Elasticsearch\Common\Exceptions\RoutingMissingException;
  19. use Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException;
  20. use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
  21. use Elasticsearch\Common\Exceptions\TransportException;
  22. use Elasticsearch\Serializers\SerializerInterface;
  23. use Elasticsearch\Transport;
  24. use GuzzleHttp\Ring\Core;
  25. use GuzzleHttp\Ring\Exception\ConnectException;
  26. use GuzzleHttp\Ring\Exception\RingException;
  27. use Psr\Log\LoggerInterface;
  28. /**
  29.  * Class AbstractConnection
  30.  *
  31.  * @category Elasticsearch
  32.  * @package  Elasticsearch\Connections
  33.  * @author   Zachary Tong <zach@elastic.co>
  34.  * @license  http://www.apache.org/licenses/LICENSE-2.0 Apache2
  35.  * @link     http://elastic.co
  36.  */
  37. class Connection implements ConnectionInterface
  38. {
  39.     /**
  40.      * @var callable
  41.      */
  42.     protected $handler;
  43.     /**
  44.      * @var SerializerInterface
  45.      */
  46.     protected $serializer;
  47.     /**
  48.      * @var string
  49.      */
  50.     protected $transportSchema 'http';    // TODO depreciate this default
  51.     /**
  52.      * @var string
  53.      */
  54.     protected $host;
  55.     /**
  56.      * @var string|null
  57.      */
  58.     protected $path;
  59.     /**
  60.     * @var int
  61.     */
  62.     protected $port;
  63.     /**
  64.      * @var LoggerInterface
  65.      */
  66.     protected $log;
  67.     /**
  68.      * @var LoggerInterface
  69.      */
  70.     protected $trace;
  71.     /**
  72.      * @var array
  73.      */
  74.     protected $connectionParams;
  75.     /**
  76.      * @var array
  77.      */
  78.     protected $headers = [];
  79.     /**
  80.      * @var bool
  81.      */
  82.     protected $isAlive false;
  83.     /**
  84.      * @var float
  85.      */
  86.     private $pingTimeout 1;    //TODO expose this
  87.     /**
  88.      * @var int
  89.      */
  90.     private $lastPing 0;
  91.     /**
  92.      * @var int
  93.      */
  94.     private $failedPings 0;
  95.     private $lastRequest = array();
  96.     /**
  97.      * @var string
  98.      */
  99.     private $OSVersion null;
  100.     public function __construct(
  101.         callable $handler,
  102.         array $hostDetails,
  103.         array $connectionParams,
  104.         SerializerInterface $serializer,
  105.         LoggerInterface $log,
  106.         LoggerInterface $trace
  107.     ) {
  108.         if (isset($hostDetails['port']) !== true) {
  109.             $hostDetails['port'] = 9200;
  110.         }
  111.         if (isset($hostDetails['scheme'])) {
  112.             $this->transportSchema $hostDetails['scheme'];
  113.         }
  114.         // Only Set the Basic if API Key is not set and setBasicAuthentication was not called prior
  115.         if (isset($connectionParams['client']['headers']['Authorization']) === false
  116.                 && isset($connectionParams['client']['curl'][CURLOPT_HTTPAUTH]) === false
  117.                 && isset($hostDetails['user'])
  118.                 && isset($hostDetails['pass'])) {
  119.             $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC;
  120.             $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass'];
  121.         }
  122.         $connectionParams['client']['curl'][CURLOPT_PORT] = $hostDetails['port'];
  123.         if (isset($connectionParams['client']['headers'])) {
  124.             $this->headers $connectionParams['client']['headers'];
  125.             unset($connectionParams['client']['headers']);
  126.         }
  127.         // Add the User-Agent using the format: <client-repo-name>/<client-version> (metadata-values)
  128.         $this->headers['User-Agent'] = [sprintf(
  129.             "elasticsearch-php/%s (%s %s; PHP %s)",
  130.             Client::VERSION,
  131.             PHP_OS,
  132.             $this->getOSVersion(),
  133.             phpversion()
  134.         )];
  135.         $host $hostDetails['host'];
  136.         $path null;
  137.         if (isset($hostDetails['path']) === true) {
  138.             $path $hostDetails['path'];
  139.         }
  140.         $port $hostDetails['port'];
  141.         $this->host             $host;
  142.         $this->path             $path;
  143.         $this->port             $port;
  144.         $this->log              $log;
  145.         $this->trace            $trace;
  146.         $this->connectionParams $connectionParams;
  147.         $this->serializer       $serializer;
  148.         $this->handler $this->wrapHandler($handler);
  149.     }
  150.     /**
  151.      * @param  string    $method
  152.      * @param  string    $uri
  153.      * @param  array     $params
  154.      * @param  null      $body
  155.      * @param  array     $options
  156.      * @param  Transport $transport
  157.      * @return mixed
  158.      */
  159.     public function performRequest(string $methodstring $uri, ?array $params = [], $body null, array $options = [], Transport $transport null)
  160.     {
  161.         if ($body !== null) {
  162.             $body $this->serializer->serialize($body);
  163.         }
  164.         if (isset($options['client']['headers']) && is_array($options['client']['headers'])) {
  165.             $this->headers array_merge($this->headers$options['client']['headers']);
  166.         }
  167.         $request = [
  168.             'http_method' => $method,
  169.             'scheme'      => $this->transportSchema,
  170.             'uri'         => $this->getURI($uri$params),
  171.             'body'        => $body,
  172.             'headers'     => array_merge(
  173.                 [
  174.                 'Host'  => [$this->host]
  175.                 ],
  176.                 $this->headers
  177.             )
  178.         ];
  179.         $request array_replace_recursive($request$this->connectionParams$options);
  180.         // RingPHP does not like if client is empty
  181.         if (empty($request['client'])) {
  182.             unset($request['client']);
  183.         }
  184.         $handler $this->handler;
  185.         $future $handler($request$this$transport$options);
  186.         return $future;
  187.     }
  188.     public function getTransportSchema(): string
  189.     {
  190.         return $this->transportSchema;
  191.     }
  192.     public function getLastRequestInfo(): array
  193.     {
  194.         return $this->lastRequest;
  195.     }
  196.     private function wrapHandler(callable $handler): callable
  197.     {
  198.         return function (array $requestConnection $connectionTransport $transport null$options) use ($handler) {
  199.             $this->lastRequest = [];
  200.             $this->lastRequest['request'] = $request;
  201.             // Send the request using the wrapped handler.
  202.             $response =  Core::proxy($handler($request), function ($response) use ($connection$transport$request$options) {
  203.                 $this->lastRequest['response'] = $response;
  204.                 if (isset($response['error']) === true) {
  205.                     if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) {
  206.                         $this->log->warning("Curl exception encountered.");
  207.                         $exception $this->getCurlRetryException($request$response);
  208.                         $this->logRequestFail($request$response$exception);
  209.                         $node $connection->getHost();
  210.                         $this->log->warning("Marking node $node dead.");
  211.                         $connection->markDead();
  212.                         // If the transport has not been set, we are inside a Ping or Sniff,
  213.                         // so we don't want to retrigger retries anyway.
  214.                         //
  215.                         // TODO this could be handled better, but we are limited because connectionpools do not
  216.                         // have access to Transport.  Architecturally, all of this needs to be refactored
  217.                         if (isset($transport) === true) {
  218.                             $transport->connectionPool->scheduleCheck();
  219.                             $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false;
  220.                             $shouldRetry $transport->shouldRetry($request);
  221.                             $shouldRetryText = ($shouldRetry) ? 'true' 'false';
  222.                             $this->log->warning("Retries left? $shouldRetryText");
  223.                             if ($shouldRetry && !$neverRetry) {
  224.                                 return $transport->performRequest(
  225.                                     $request['http_method'],
  226.                                     $request['uri'],
  227.                                     [],
  228.                                     $request['body'],
  229.                                     $options
  230.                                 );
  231.                             }
  232.                         }
  233.                         $this->log->warning("Out of retries, throwing exception from $node");
  234.                         // Only throw if we run out of retries
  235.                         throw $exception;
  236.                     } else {
  237.                         // Something went seriously wrong, bail
  238.                         $exception = new TransportException($response['error']->getMessage());
  239.                         $this->logRequestFail($request$response$exception);
  240.                         throw $exception;
  241.                     }
  242.                 } else {
  243.                     $connection->markAlive();
  244.                     if (isset($response['headers']['Warning'])) {
  245.                         $this->logWarning($request$response);
  246.                     }
  247.                     if (isset($response['body']) === true) {
  248.                         $response['body'] = stream_get_contents($response['body']);
  249.                         $this->lastRequest['response']['body'] = $response['body'];
  250.                     }
  251.                     if ($response['status'] >= 400 && $response['status'] < 500) {
  252.                         $ignore $request['client']['ignore'] ?? [];
  253.                         // Skip 404 if succeeded true in the body (e.g. clear_scroll)
  254.                         $body $response['body'] ?? '';
  255.                         if (strpos($body'"succeeded":true') !== false) {
  256.                              $ignore[] = 404;
  257.                         }
  258.                         $this->process4xxError($request$response$ignore);
  259.                     } elseif ($response['status'] >= 500) {
  260.                         $ignore $request['client']['ignore'] ?? [];
  261.                         $this->process5xxError($request$response$ignore);
  262.                     }
  263.                     // No error, deserialize
  264.                     $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
  265.                 }
  266.                 $this->logRequestSuccess($request$response);
  267.                 return isset($request['client']['verbose']) && $request['client']['verbose'] === true $response $response['body'];
  268.             });
  269.             return $response;
  270.         };
  271.     }
  272.     private function getURI(string $uri, ?array $params): string
  273.     {
  274.         if (isset($params) === true && !empty($params)) {
  275.             array_walk(
  276.                 $params,
  277.                 function (&$value, &$key) {
  278.                     if ($value === true) {
  279.                         $value 'true';
  280.                     } elseif ($value === false) {
  281.                         $value 'false';
  282.                     }
  283.                 }
  284.             );
  285.             $uri .= '?' http_build_query($params);
  286.         }
  287.         if ($this->path !== null) {
  288.             $uri $this->path $uri;
  289.         }
  290.         return $uri ?? '';
  291.     }
  292.     public function getHeaders(): array
  293.     {
  294.         return $this->headers;
  295.     }
  296.     public function logWarning(array $request, array $response): void
  297.     {
  298.         $this->log->warning('Deprecation'$response['headers']['Warning']);
  299.     }
  300.     /**
  301.      * Log a successful request
  302.      *
  303.      * @param array $request
  304.      * @param array $response
  305.      * @return void
  306.      */
  307.     public function logRequestSuccess(array $request, array $response): void
  308.     {
  309.         $this->log->debug('Request Body', array($request['body']));
  310.         $this->log->info(
  311.             'Request Success:',
  312.             array(
  313.                 'method'    => $request['http_method'],
  314.                 'uri'       => $response['effective_url'],
  315.                 'headers'   => $request['headers'],
  316.                 'HTTP code' => $response['status'],
  317.                 'duration'  => $response['transfer_stats']['total_time'],
  318.             )
  319.         );
  320.         $this->log->debug('Response', array($response['body']));
  321.         // Build the curl command for Trace.
  322.         $curlCommand $this->buildCurlCommand($request['http_method'], $response['effective_url'], $request['body']);
  323.         $this->trace->info($curlCommand);
  324.         $this->trace->debug(
  325.             'Response:',
  326.             array(
  327.                 'response'  => $response['body'],
  328.                 'method'    => $request['http_method'],
  329.                 'uri'       => $response['effective_url'],
  330.                 'HTTP code' => $response['status'],
  331.                 'duration'  => $response['transfer_stats']['total_time'],
  332.             )
  333.         );
  334.     }
  335.     /**
  336.      * Log a failed request
  337.      *
  338.      * @param array $request
  339.      * @param array $response
  340.      * @param \Exception $exception
  341.      *
  342.      * @return void
  343.      */
  344.     public function logRequestFail(array $request, array $response, \Exception $exception): void
  345.     {
  346.         $this->log->debug('Request Body', array($request['body']));
  347.         $this->log->warning(
  348.             'Request Failure:',
  349.             array(
  350.                 'method'    => $request['http_method'],
  351.                 'uri'       => $response['effective_url'],
  352.                 'headers'   => $request['headers'],
  353.                 'HTTP code' => $response['status'],
  354.                 'duration'  => $response['transfer_stats']['total_time'],
  355.                 'error'     => $exception->getMessage(),
  356.             )
  357.         );
  358.         $this->log->warning('Response', array($response['body']));
  359.         // Build the curl command for Trace.
  360.         $curlCommand $this->buildCurlCommand($request['http_method'], $response['effective_url'], $request['body']);
  361.         $this->trace->info($curlCommand);
  362.         $this->trace->debug(
  363.             'Response:',
  364.             array(
  365.                 'response'  => $response,
  366.                 'method'    => $request['http_method'],
  367.                 'uri'       => $response['effective_url'],
  368.                 'HTTP code' => $response['status'],
  369.                 'duration'  => $response['transfer_stats']['total_time'],
  370.             )
  371.         );
  372.     }
  373.     public function ping(): bool
  374.     {
  375.         $options = [
  376.             'client' => [
  377.                 'timeout' => $this->pingTimeout,
  378.                 'never_retry' => true,
  379.                 'verbose' => true
  380.             ]
  381.         ];
  382.         try {
  383.             $response $this->performRequest('HEAD''/'nullnull$options);
  384.             $response $response->wait();
  385.         } catch (TransportException $exception) {
  386.             $this->markDead();
  387.             return false;
  388.         }
  389.         if ($response['status'] === 200) {
  390.             $this->markAlive();
  391.             return true;
  392.         } else {
  393.             $this->markDead();
  394.             return false;
  395.         }
  396.     }
  397.     /**
  398.      * @return array|\GuzzleHttp\Ring\Future\FutureArray
  399.      */
  400.     public function sniff()
  401.     {
  402.         $options = [
  403.             'client' => [
  404.                 'timeout' => $this->pingTimeout,
  405.                 'never_retry' => true
  406.             ]
  407.         ];
  408.         return $this->performRequest('GET''/_nodes/'nullnull$options);
  409.     }
  410.     public function isAlive(): bool
  411.     {
  412.         return $this->isAlive;
  413.     }
  414.     public function markAlive(): void
  415.     {
  416.         $this->failedPings 0;
  417.         $this->isAlive true;
  418.         $this->lastPing time();
  419.     }
  420.     public function markDead(): void
  421.     {
  422.         $this->isAlive false;
  423.         $this->failedPings += 1;
  424.         $this->lastPing time();
  425.     }
  426.     public function getLastPing(): int
  427.     {
  428.         return $this->lastPing;
  429.     }
  430.     public function getPingFailures(): int
  431.     {
  432.         return $this->failedPings;
  433.     }
  434.     public function getHost(): string
  435.     {
  436.         return $this->host;
  437.     }
  438.     public function getUserPass(): ?string
  439.     {
  440.         return $this->connectionParams['client']['curl'][CURLOPT_USERPWD] ?? null;
  441.     }
  442.     public function getPath(): ?string
  443.     {
  444.         return $this->path;
  445.     }
  446.     /**
  447.      * @return int
  448.      */
  449.     public function getPort()
  450.     {
  451.         return $this->port;
  452.     }
  453.     protected function getCurlRetryException(array $request, array $response): ElasticsearchException
  454.     {
  455.         $exception null;
  456.         $message $response['error']->getMessage();
  457.         $exception = new MaxRetriesException($message);
  458.         switch ($response['curl']['errno']) {
  459.             case 6:
  460.                 $exception = new CouldNotResolveHostException($message0$exception);
  461.                 break;
  462.             case 7:
  463.                 $exception = new CouldNotConnectToHost($message0$exception);
  464.                 break;
  465.             case 28:
  466.                 $exception = new OperationTimeoutException($message0$exception);
  467.                 break;
  468.         }
  469.         return $exception;
  470.     }
  471.     /**
  472.      * Get the OS version using php_uname if available
  473.      * otherwise it returns an empty string
  474.      *
  475.      * @see  https://github.com/elastic/elasticsearch-php/issues/922
  476.      */
  477.     private function getOSVersion(): string
  478.     {
  479.         if ($this->OSVersion === null) {
  480.             $this->OSVersion strpos(strtolower(ini_get('disable_functions')), 'php_uname') !== false
  481.                 ''
  482.                 php_uname("r");
  483.         }
  484.         return $this->OSVersion;
  485.     }
  486.     /**
  487.      * Construct a string cURL command
  488.      */
  489.     private function buildCurlCommand(string $methodstring $uri, ?string $body): string
  490.     {
  491.         if (strpos($uri'?') === false) {
  492.             $uri .= '?pretty=true';
  493.         } else {
  494.             str_replace('?''?pretty=true'$uri);
  495.         }
  496.         $curlCommand 'curl -X' strtoupper($method);
  497.         $curlCommand .= " '" $uri "'";
  498.         if (isset($body) === true && $body !== '') {
  499.             $curlCommand .= " -d '" $body "'";
  500.         }
  501.         return $curlCommand;
  502.     }
  503.     private function process4xxError(array $request, array $response, array $ignore): ?ElasticsearchException
  504.     {
  505.         $statusCode $response['status'];
  506.         $responseBody $response['body'];
  507.         /**
  508.  * @var \Exception $exception
  509. */
  510.         $exception $this->tryDeserialize400Error($response);
  511.         if (array_search($response['status'], $ignore) !== false) {
  512.             return null;
  513.         }
  514.         // if responseBody is not string, we convert it so it can be used as Exception message
  515.         if (!is_string($responseBody)) {
  516.             $responseBody json_encode($responseBody);
  517.         }
  518.         if ($statusCode === 400 && strpos($responseBody"AlreadyExpiredException") !== false) {
  519.             $exception = new AlreadyExpiredException($responseBody$statusCode);
  520.         } elseif ($statusCode === 403) {
  521.             $exception = new Forbidden403Exception($responseBody$statusCode);
  522.         } elseif ($statusCode === 404) {
  523.             $exception = new Missing404Exception($responseBody$statusCode);
  524.         } elseif ($statusCode === 409) {
  525.             $exception = new Conflict409Exception($responseBody$statusCode);
  526.         } elseif ($statusCode === 400 && strpos($responseBody'script_lang not supported') !== false) {
  527.             $exception = new ScriptLangNotSupportedException($responseBody$statusCode);
  528.         } elseif ($statusCode === 408) {
  529.             $exception = new RequestTimeout408Exception($responseBody$statusCode);
  530.         } else {
  531.             $exception = new BadRequest400Exception($responseBody$statusCode);
  532.         }
  533. //file_put_contents('/data/web/shopware/var/log/elastic.log', PHP_EOL.date("Y-m-d H:i:s").chr(9).'$responseBody: '.$responseBody.chr(9).'$statusCode: '.$statusCode.PHP_EOL, FILE_APPEND);
  534.         $this->logRequestFail($request$response$exception);
  535.         throw $exception;
  536.     }
  537.     private function process5xxError(array $request, array $response, array $ignore): ?ElasticsearchException
  538.     {
  539.         $statusCode = (int) $response['status'];
  540.         $responseBody $response['body'];
  541.         /**
  542.  * @var \Exception $exception
  543. */
  544.         $exception $this->tryDeserialize500Error($response);
  545.         $exceptionText "[$statusCode Server Exception] ".$exception->getMessage();
  546.         $this->log->error($exceptionText);
  547.         $this->log->error($exception->getTraceAsString());
  548.         if (array_search($statusCode$ignore) !== false) {
  549.             return null;
  550.         }
  551.         if ($statusCode === 500 && strpos($responseBody"RoutingMissingException") !== false) {
  552.             $exception = new RoutingMissingException($exception->getMessage(), $statusCode$exception);
  553.         } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/'$responseBody) === 1) {
  554.             $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode$exception);
  555.         } elseif ($statusCode === 500 && strpos($responseBody'NoShardAvailableActionException') !== false) {
  556.             $exception = new NoShardAvailableException($exception->getMessage(), $statusCode$exception);
  557.         } else {
  558.             $exception = new ServerErrorResponseException($responseBody$statusCode);
  559.         }
  560.         $this->logRequestFail($request$response$exception);
  561.         throw $exception;
  562.     }
  563.     private function tryDeserialize400Error(array $response): ElasticsearchException
  564.     {
  565.         return $this->tryDeserializeError($responseBadRequest400Exception::class);
  566.     }
  567.     private function tryDeserialize500Error(array $response): ElasticsearchException
  568.     {
  569.         return $this->tryDeserializeError($responseServerErrorResponseException::class);
  570.     }
  571.     private function tryDeserializeError(array $responsestring $errorClass): ElasticsearchException
  572.     {
  573.         $error $this->serializer->deserialize($response['body'], $response['transfer_stats']);
  574.         if (is_array($error) === true) {
  575.             // 2.0 structured exceptions
  576.             if (isset($error['error']['reason']) === true) {
  577.                 // Try to use root cause first (only grabs the first root cause)
  578.                 $root $error['error']['root_cause'];
  579.                 if (isset($root) && isset($root[0])) {
  580.                     $cause $root[0]['reason'];
  581.                     $type $root[0]['type'];
  582.                 } else {
  583.                     $cause $error['error']['reason'];
  584.                     $type $error['error']['type'];
  585.                 }
  586.                 // added json_encode to convert into a string
  587.                 $original = new $errorClass(json_encode($response['body']), $response['status']);
  588.                 return new $errorClass("$type$cause", (int) $response['status'], $original);
  589.             } elseif (isset($error['error']) === true) {
  590.                 // <2.0 semi-structured exceptions
  591.                 // added json_encode to convert into a string
  592.                 $original = new $errorClass(json_encode($response['body']), $response['status']);
  593.                 return new $errorClass($error['error'], (int) $response['status'], $original);
  594.             }
  595.             // <2.0 "i just blew up" nonstructured exception
  596.             // $error is an array but we don't know the format, reuse the response body instead
  597.             // added json_encode to convert into a string
  598.             return new $errorClass(json_encode($response['body']), (int) $response['status']);
  599.         }
  600.         // if responseBody is not string, we convert it so it can be used as Exception message
  601.         $responseBody $response['body'];
  602.         if (!is_string($responseBody)) {
  603.             $responseBody json_encode($responseBody);
  604.         }
  605.         // <2.0 "i just blew up" nonstructured exception
  606.         return new $errorClass($responseBody);
  607.     }
  608. }