close(); return array_keys(get_object_vars($this)); } /** * Returns a value indicating whether the DB connection is established. * @return boolean whether the DB connection is established */ public function getIsActive() { return $this->_socket !== null; } /** * Establishes a DB connection. * It does nothing if a DB connection has already been established. * @throws Exception if connection fails */ public function open() { if ($this->_socket === null) { if (empty($this->dsn)) { throw new InvalidConfigException('Connection.dsn cannot be empty.'); } $dsn = explode('/', $this->dsn); $host = $dsn[2]; if (strpos($host, ':')===false) { $host .= ':6379'; } $db = isset($dsn[3]) ? $dsn[3] : 0; \Yii::trace('Opening DB connection: ' . $this->dsn, __CLASS__); $this->_socket = @stream_socket_client( $host, $errorNumber, $errorDescription, $this->timeout ? $this->timeout : ini_get("default_socket_timeout") ); if ($this->_socket) { if ($this->password !== null) { $this->executeCommand('AUTH', array($this->password)); } $this->executeCommand('SELECT', array($db)); $this->initConnection(); } else { \Yii::error("Failed to open DB connection ({$this->dsn}): " . $errorNumber . ' - ' . $errorDescription, __CLASS__); $message = YII_DEBUG ? 'Failed to open DB connection: ' . $errorNumber . ' - ' . $errorDescription : 'Failed to open DB connection.'; throw new Exception($message, (int)$errorNumber, $errorDescription); } } } /** * Closes the currently active DB connection. * It does nothing if the connection is already closed. */ public function close() { if ($this->_socket !== null) { \Yii::trace('Closing DB connection: ' . $this->dsn, __CLASS__); $this->executeCommand('QUIT'); stream_socket_shutdown($this->_socket, STREAM_SHUT_RDWR); $this->_socket = null; $this->_transaction = null; } } /** * Initializes the DB connection. * This method is invoked right after the DB connection is established. * The default implementation triggers an [[EVENT_AFTER_OPEN]] event. */ protected function initConnection() { $this->trigger(self::EVENT_AFTER_OPEN); } /** * Returns the currently active transaction. * @return Transaction the currently active transaction. Null if no active transaction. */ public function getTransaction() { return $this->_transaction && $this->_transaction->isActive ? $this->_transaction : null; } /** * Starts a transaction. * @return Transaction the transaction initiated */ public function beginTransaction() { $this->open(); $this->_transaction = new Transaction(array( 'db' => $this, )); $this->_transaction->begin(); return $this->_transaction; } /** * Returns the name of the DB driver for the current [[dsn]]. * @return string name of the DB driver */ public function getDriverName() { if (($pos = strpos($this->dsn, ':')) !== false) { return strtolower(substr($this->dsn, 0, $pos)); } else { return 'redis'; } } /** * * @param string $name * @param array $params * @return mixed */ public function __call($name, $params) { $redisCommand = strtoupper(StringHelper::camel2words($name, false)); if (in_array($redisCommand, $this->redisCommands)) { return $this->executeCommand($name, $params); } else { return parent::__call($name, $params); } } /** * Execute a redis command * http://redis.io/commands * http://redis.io/topics/protocol * * @param $name * @param $params * @return array|bool|null|string * Returns true on Status reply * TODO explain all reply types */ public function executeCommand($name, $params=array()) { $this->open(); array_unshift($params, $name); $command = '*' . count($params) . "\r\n"; foreach($params as $arg) { $command .= '$' . strlen($arg) . "\r\n" . $arg . "\r\n"; } \Yii::trace("Executing Redis Command: {$name}", __CLASS__); fwrite($this->_socket, $command); return $this->parseResponse(implode(' ', $params)); } private function parseResponse($command) { if(($line = fgets($this->_socket))===false) { throw new Exception("Failed to read from socket.\nRedis command was: " . $command); } $type = $line[0]; $line = substr($line, 1, -2); switch($type) { case '+': // Status reply return true; case '-': // Error reply throw new Exception("Redis error: " . $line . "\nRedis command was: " . $command); case ':': // Integer reply // no cast to integer as it is in the range of a signed 64 bit integer return $line; case '$': // Bulk replies if ($line == '-1') { return null; } $data = fread($this->_socket, $line + 2); if($data===false) { throw new Exception("Failed to read from socket.\nRedis command was: " . $command); } return substr($data, 0, -2); case '*': // Multi-bulk replies $count = (int) $line; $data = array(); for($i = 0; $i < $count; $i++) { $data[] = $this->parseResponse($command); } return $data; default: throw new Exception('Received illegal data from redis: ' . $line . "\nRedis command was: " . $command); } } }