You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							349 lines
						
					
					
						
							9.6 KiB
						
					
					
				
			
		
		
	
	
							349 lines
						
					
					
						
							9.6 KiB
						
					
					
				<?php | 
						|
/** | 
						|
 * @link http://www.yiiframework.com/ | 
						|
 * @copyright Copyright (c) 2008 Yii Software LLC | 
						|
 * @license http://www.yiiframework.com/license/ | 
						|
 */ | 
						|
 | 
						|
namespace yii\elasticsearch; | 
						|
 | 
						|
use Yii; | 
						|
use yii\base\Component; | 
						|
use yii\base\InvalidConfigException; | 
						|
use yii\helpers\Json; | 
						|
 | 
						|
/** | 
						|
 * elasticsearch Connection is used to connect to an elasticsearch cluster version 0.20 or higher | 
						|
 * | 
						|
 * @property string $driverName Name of the DB driver. This property is read-only. | 
						|
 * @property boolean $isActive Whether the DB connection is established. This property is read-only. | 
						|
 * | 
						|
 * @author Carsten Brandt <mail@cebe.cc> | 
						|
 * @since 2.0 | 
						|
 */ | 
						|
class Connection extends Component | 
						|
{ | 
						|
	/** | 
						|
	 * @event Event an event that is triggered after a DB connection is established | 
						|
	 */ | 
						|
	const EVENT_AFTER_OPEN = 'afterOpen'; | 
						|
 | 
						|
	/** | 
						|
	 * @var bool whether to autodetect available cluster nodes on [[open()]] | 
						|
	 */ | 
						|
	public $autodetectCluster = true; | 
						|
	/** | 
						|
	 * @var array cluster nodes | 
						|
	 * This is populated with the result of a cluster nodes request when [[autodetectCluster]] is true. | 
						|
	 * @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info | 
						|
	 */ | 
						|
	public $nodes = [ | 
						|
		['http_address' => 'inet[/127.0.0.1:9200]'], | 
						|
	]; | 
						|
	/** | 
						|
	 * @var array the active node. key of [[nodes]]. Will be randomly selected on [[open()]]. | 
						|
	 */ | 
						|
	public $activeNode; | 
						|
 | 
						|
	// TODO http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/_configuration.html#_example_configuring_http_basic_auth | 
						|
	public $auth = []; | 
						|
	/** | 
						|
	 * @var float timeout to use for connecting to an elasticsearch node. | 
						|
	 * This value will be used to configure the curl `CURLOPT_CONNECTTIMEOUT` option. | 
						|
	 * If not set, no explicit timeout will be set for curl. | 
						|
	 */ | 
						|
	public $connectionTimeout = null; | 
						|
	/** | 
						|
	 * @var float timeout to use when reading the response from an elasticsearch node. | 
						|
	 * This value will be used to configure the curl `CURLOPT_TIMEOUT` option. | 
						|
	 * If not set, no explicit timeout will be set for curl. | 
						|
	 */ | 
						|
	public $dataTimeout = null; | 
						|
 | 
						|
 | 
						|
	public function init() | 
						|
	{ | 
						|
		foreach($this->nodes as $node) { | 
						|
			if (!isset($node['http_address'])) { | 
						|
				throw new InvalidConfigException('Elasticsearch node needs at least a http_address configured.'); | 
						|
			} | 
						|
		} | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Closes the connection when this component is being serialized. | 
						|
	 * @return array | 
						|
	 */ | 
						|
	public function __sleep() | 
						|
	{ | 
						|
		$this->close(); | 
						|
		return array_keys(get_object_vars($this)); | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Returns a value indicating whether the DB connection is established. | 
						|
	 * @return boolean whether the DB connection is established | 
						|
	 */ | 
						|
	public function getIsActive() | 
						|
	{ | 
						|
		return $this->activeNode !== null; | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Establishes a DB connection. | 
						|
	 * It does nothing if a DB connection has already been established. | 
						|
	 * @throws Exception if connection fails | 
						|
	 */ | 
						|
	public function open() | 
						|
	{ | 
						|
		if ($this->activeNode !== null) { | 
						|
			return; | 
						|
		} | 
						|
		if (empty($this->nodes)) { | 
						|
			throw new InvalidConfigException('elasticsearch needs at least one node to operate.'); | 
						|
		} | 
						|
		if ($this->autodetectCluster) { | 
						|
			$node = reset($this->nodes); | 
						|
			$host = $node['http_address']; | 
						|
			if (strncmp($host, 'inet[/', 6) == 0) { | 
						|
				$host = substr($host, 6, -1); | 
						|
			} | 
						|
			$response = $this->httpRequest('GET', 'http://' . $host . '/_cluster/nodes'); | 
						|
			$this->nodes = $response['nodes']; | 
						|
			if (empty($this->nodes)) { | 
						|
				throw new Exception('cluster autodetection did not find any active node.'); | 
						|
			} | 
						|
		} | 
						|
		$this->selectActiveNode(); | 
						|
		Yii::trace('Opening connection to elasticsearch. Nodes in cluster: ' . count($this->nodes) | 
						|
			. ', active node: ' . $this->nodes[$this->activeNode]['http_address'], __CLASS__); | 
						|
		$this->initConnection(); | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * select active node randomly | 
						|
	 */ | 
						|
	protected function selectActiveNode() | 
						|
	{ | 
						|
		$keys = array_keys($this->nodes); | 
						|
		$this->activeNode = $keys[rand(0, count($keys) - 1)]; | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Closes the currently active DB connection. | 
						|
	 * It does nothing if the connection is already closed. | 
						|
	 */ | 
						|
	public function close() | 
						|
	{ | 
						|
		Yii::trace('Closing connection to elasticsearch. Active node was: ' | 
						|
			. $this->nodes[$this->activeNode]['http_address'], __CLASS__); | 
						|
		$this->activeNode = null; | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Initializes the DB connection. | 
						|
	 * This method is invoked right after the DB connection is established. | 
						|
	 * The default implementation triggers an [[EVENT_AFTER_OPEN]] event. | 
						|
	 */ | 
						|
	protected function initConnection() | 
						|
	{ | 
						|
		$this->trigger(self::EVENT_AFTER_OPEN); | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Returns the name of the DB driver for the current [[dsn]]. | 
						|
	 * @return string name of the DB driver | 
						|
	 */ | 
						|
	public function getDriverName() | 
						|
	{ | 
						|
		return 'elasticsearch'; | 
						|
	} | 
						|
 | 
						|
	/** | 
						|
	 * Creates a command for execution. | 
						|
	 * @param array $config the configuration for the Command class | 
						|
	 * @return Command the DB command | 
						|
	 */ | 
						|
	public function createCommand($config = []) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		$config['db'] = $this; | 
						|
		$command = new Command($config); | 
						|
		return $command; | 
						|
	} | 
						|
 | 
						|
	public function getQueryBuilder() | 
						|
	{ | 
						|
		return new QueryBuilder($this); | 
						|
	} | 
						|
 | 
						|
	public function get($url, $options = [], $body = null) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		return $this->httpRequest('GET', $this->createUrl($url, $options), $body); | 
						|
	} | 
						|
 | 
						|
	public function head($url, $options = [], $body = null) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		return $this->httpRequest('HEAD', $this->createUrl($url, $options), $body); | 
						|
	} | 
						|
 | 
						|
	public function post($url, $options = [], $body = null) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		return $this->httpRequest('POST', $this->createUrl($url, $options), $body); | 
						|
	} | 
						|
 | 
						|
	public function put($url, $options = [], $body = null) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		return $this->httpRequest('PUT', $this->createUrl($url, $options), $body); | 
						|
	} | 
						|
 | 
						|
	public function delete($url, $options = [], $body = null) | 
						|
	{ | 
						|
		$this->open(); | 
						|
		return $this->httpRequest('DELETE', $this->createUrl($url, $options), $body); | 
						|
	} | 
						|
 | 
						|
	private function createUrl($path, $options = []) | 
						|
	{ | 
						|
		$url = implode('/', array_map(function($a) { | 
						|
			return urlencode(is_array($a) ? implode(',', $a) : $a); | 
						|
		}, $path)); | 
						|
 | 
						|
		if (!empty($options)) { | 
						|
			$url .= '?' . http_build_query($options); | 
						|
		} | 
						|
		return [$this->nodes[$this->activeNode]['http_address'], $url]; | 
						|
	} | 
						|
 | 
						|
	protected function httpRequest($method, $url, $requestBody = null) | 
						|
	{ | 
						|
		$method = strtoupper($method); | 
						|
 | 
						|
		// response body and headers | 
						|
		$headers = []; | 
						|
		$body = ''; | 
						|
 | 
						|
		$options = [ | 
						|
			CURLOPT_USERAGENT      => 'Yii2 Framework ' . __CLASS__, | 
						|
			CURLOPT_RETURNTRANSFER => false, | 
						|
			CURLOPT_HEADER         => false, | 
						|
			// http://www.php.net/manual/en/function.curl-setopt.php#82418 | 
						|
			CURLOPT_HTTPHEADER     => ['Expect:'], | 
						|
 | 
						|
			CURLOPT_WRITEFUNCTION  => function($curl, $data) use (&$body) { | 
						|
				$body .= $data; | 
						|
				return mb_strlen($data, '8bit'); | 
						|
			}, | 
						|
			CURLOPT_HEADERFUNCTION => function($curl, $data) use (&$headers) { | 
						|
				foreach(explode("\r\n", $data) as $row) { | 
						|
					if (($pos = strpos($row, ':')) !== false) { | 
						|
						$headers[strtolower(substr($row, 0, $pos))] = trim(substr($row, $pos + 1)); | 
						|
					} | 
						|
				} | 
						|
				return mb_strlen($data, '8bit'); | 
						|
			}, | 
						|
			CURLOPT_CUSTOMREQUEST  => $method, | 
						|
		]; | 
						|
		if ($this->connectionTimeout !== null) { | 
						|
			$options[CURLOPT_CONNECTTIMEOUT] = $this->connectionTimeout; | 
						|
		} | 
						|
		if ($this->dataTimeout !== null) { | 
						|
			$options[CURLOPT_TIMEOUT] = $this->dataTimeout; | 
						|
		} | 
						|
		if ($requestBody !== null) { | 
						|
			$options[CURLOPT_POSTFIELDS] = $requestBody; | 
						|
		} | 
						|
		if ($method == 'HEAD') { | 
						|
			$options[CURLOPT_NOBODY] = true; | 
						|
			unset($options[CURLOPT_WRITEFUNCTION]); | 
						|
		} | 
						|
 | 
						|
		if (is_array($url)) { | 
						|
			list($host, $q) = $url; | 
						|
			if (strncmp($host, 'inet[/', 6) == 0) { | 
						|
				$host = substr($host, 6, -1); | 
						|
			} | 
						|
			$profile = $q . $requestBody; | 
						|
			$url = 'http://' . $host . '/' . $q; | 
						|
		} else { | 
						|
			$profile = false; | 
						|
		} | 
						|
 | 
						|
		Yii::trace("Sending request to elasticsearch node: $url\n$requestBody", __METHOD__); | 
						|
		if ($profile !== false) { | 
						|
			Yii::beginProfile($profile, __METHOD__); | 
						|
		} | 
						|
 | 
						|
		$curl = curl_init($url); | 
						|
		curl_setopt_array($curl, $options); | 
						|
		if (curl_exec($curl) === false) { | 
						|
			throw new Exception('Elasticsearch request failed: ' . curl_errno($curl) . ' - ' . curl_error($curl), [ | 
						|
				'requestMethod' => $method, | 
						|
				'requestUrl' => $url, | 
						|
				'requestBody' => $requestBody, | 
						|
				'responseHeaders' => $headers, | 
						|
				'responseBody' => $body, | 
						|
			]); | 
						|
		} | 
						|
 | 
						|
		$responseCode = curl_getinfo($curl, CURLINFO_HTTP_CODE); | 
						|
		curl_close($curl); | 
						|
 | 
						|
		if ($profile !== false) { | 
						|
			Yii::endProfile($profile, __METHOD__); | 
						|
		} | 
						|
 | 
						|
		if ($responseCode >= 200 && $responseCode < 300) { | 
						|
			if ($method == 'HEAD') { | 
						|
				return true; | 
						|
			} else { | 
						|
				if (isset($headers['content-length']) && ($len = mb_strlen($body, '8bit')) < $headers['content-length']) { | 
						|
					throw new Exception("Incomplete data received from elasticsearch: $len < {$headers['content-length']}", [ | 
						|
						'requestMethod' => $method, | 
						|
						'requestUrl' => $url, | 
						|
						'requestBody' => $requestBody, | 
						|
						'responseCode' => $responseCode, | 
						|
						'responseHeaders' => $headers, | 
						|
						'responseBody' => $body, | 
						|
					]); | 
						|
				} | 
						|
				if (isset($headers['content-type']) && !strncmp($headers['content-type'], 'application/json', 16)) { | 
						|
					return Json::decode($body); | 
						|
				} | 
						|
				throw new Exception('Unsupported data received from elasticsearch: ' . $headers['content-type'], [ | 
						|
					'requestMethod' => $method, | 
						|
					'requestUrl' => $url, | 
						|
					'requestBody' => $requestBody, | 
						|
					'responseCode' => $responseCode, | 
						|
					'responseHeaders' => $headers, | 
						|
					'responseBody' => $body, | 
						|
				]); | 
						|
			} | 
						|
		} elseif ($responseCode == 404) { | 
						|
			return false; | 
						|
		} else { | 
						|
			throw new Exception("Elasticsearch request failed with code $responseCode.", [ | 
						|
				'requestMethod' => $method, | 
						|
				'requestUrl' => $url, | 
						|
				'requestBody' => $requestBody, | 
						|
				'responseCode' => $responseCode, | 
						|
				'responseHeaders' => $headers, | 
						|
				'responseBody' => $body, | 
						|
			]); | 
						|
		} | 
						|
	} | 
						|
 | 
						|
	public function getNodeInfo() | 
						|
	{ | 
						|
		return $this->get([]); | 
						|
	} | 
						|
 | 
						|
	public function getClusterState() | 
						|
	{ | 
						|
		return $this->get(['_cluster', 'state']); | 
						|
	} | 
						|
} |