close(); return array_keys(get_object_vars($this)); } /** * Returns a value indicating whether the DB connection is established. * @return boolean whether the DB connection is established */ public function getIsActive() { return $this->_socket !== null; } /** * Establishes a DB connection. * It does nothing if a DB connection has already been established. * @throws Exception if connection fails */ public function open() { if ($this->_socket === null) { if (empty($this->dsn)) { throw new InvalidConfigException('Connection.dsn cannot be empty.'); } $dsn = explode('/', $this->dsn); $host = $dsn[2]; if (strpos($host, ':')===false) { $host .= ':6379'; } $db = isset($dsn[3]) ? $dsn[3] : 0; \Yii::trace('Opening DB connection: ' . $this->dsn, __CLASS__); $this->_socket = @stream_socket_client( $host, $errorNumber, $errorDescription, $this->connectionTimeout ? $this->connectionTimeout : ini_get("default_socket_timeout") ); if ($this->_socket) { if ($this->dataTimeout !== null) { stream_set_timeout($this->_socket, $timeout=(int)$this->dataTimeout, (int) (($this->dataTimeout - $timeout) * 1000000)); } if ($this->password !== null) { $this->executeCommand('AUTH', array($this->password)); } $this->executeCommand('SELECT', array($db)); $this->initConnection(); } else { \Yii::error("Failed to open DB connection ({$this->dsn}): " . $errorNumber . ' - ' . $errorDescription, __CLASS__); $message = YII_DEBUG ? 'Failed to open DB connection: ' . $errorNumber . ' - ' . $errorDescription : 'Failed to open DB connection.'; throw new Exception($message, $errorDescription, (int)$errorNumber); } } } /** * Closes the currently active DB connection. * It does nothing if the connection is already closed. */ public function close() { if ($this->_socket !== null) { \Yii::trace('Closing DB connection: ' . $this->dsn, __CLASS__); $this->executeCommand('QUIT'); stream_socket_shutdown($this->_socket, STREAM_SHUT_RDWR); $this->_socket = null; $this->_transaction = null; } } /** * Initializes the DB connection. * This method is invoked right after the DB connection is established. * The default implementation triggers an [[EVENT_AFTER_OPEN]] event. */ protected function initConnection() { $this->trigger(self::EVENT_AFTER_OPEN); } /** * Returns the currently active transaction. * @return Transaction the currently active transaction. Null if no active transaction. */ public function getTransaction() { return $this->_transaction && $this->_transaction->isActive ? $this->_transaction : null; } /** * Starts a transaction. * @return Transaction the transaction initiated */ public function beginTransaction() { $this->open(); $this->_transaction = new Transaction(array( 'db' => $this, )); $this->_transaction->begin(); return $this->_transaction; } /** * Returns the name of the DB driver for the current [[dsn]]. * @return string name of the DB driver */ public function getDriverName() { if (($pos = strpos($this->dsn, ':')) !== false) { return strtolower(substr($this->dsn, 0, $pos)); } else { return 'redis'; } } /** * * @param string $name * @param array $params * @return mixed */ public function __call($name, $params) { $redisCommand = strtoupper(Inflector::camel2words($name, false)); if (in_array($redisCommand, $this->redisCommands)) { return $this->executeCommand($name, $params); } else { return parent::__call($name, $params); } } /** * Executes a redis command. * For a list of available commands and their parameters see http://redis.io/commands. * * @param string $name the name of the command * @param array $params list of parameters for the command * @return array|bool|null|string Dependend on the executed command this method * will return different data types: * * - `true` for commands that return "status reply". * - `string` for commands that return "integer reply" * as the value is in the range of a signed 64 bit integer. * - `string` or `null` for commands that return "bulk reply". * - `array` for commands that return "Multi-bulk replies". * * See [redis protocol description](http://redis.io/topics/protocol) * for details on the mentioned reply types. * @trows Exception for commands that return [error reply](http://redis.io/topics/protocol#error-reply). */ public function executeCommand($name, $params=array()) { $this->open(); array_unshift($params, $name); $command = '*' . count($params) . "\r\n"; foreach($params as $arg) { $command .= '$' . mb_strlen($arg, '8bit') . "\r\n" . $arg . "\r\n"; } \Yii::trace("Executing Redis Command: {$name}", __CLASS__); fwrite($this->_socket, $command); return $this->parseResponse(implode(' ', $params)); } private function parseResponse($command) { if(($line = fgets($this->_socket)) === false) { throw new Exception("Failed to read from socket.\nRedis command was: " . $command); } $type = $line[0]; $line = mb_substr($line, 1, -2, '8bit'); switch($type) { case '+': // Status reply return true; case '-': // Error reply throw new Exception("Redis error: " . $line . "\nRedis command was: " . $command); case ':': // Integer reply // no cast to int as it is in the range of a signed 64 bit integer return $line; case '$': // Bulk replies if ($line == '-1') { return null; } $length = $line + 2; $data = ''; while ($length > 0) { if(($block = fread($this->_socket, $line + 2)) === false) { throw new Exception("Failed to read from socket.\nRedis command was: " . $command); } $data .= $block; $length -= mb_strlen($block, '8bit'); } return mb_substr($data, 0, -2, '8bit'); case '*': // Multi-bulk replies $count = (int) $line; $data = array(); for($i = 0; $i < $count; $i++) { $data[] = $this->parseResponse($command); } return $data; default: throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . $command); } } }