cfg = $cfg; $this->clients = []; $this->reading = []; $this->writing = []; $this->quiting = false; $this->restarting = false; } /** * 析构函数 */ public function __destruct() { if (isset($this->fd)) { fclose($this->fd); } } /** * 开始服务 */ public function start() { //创建锁文件 $lockFilePath = "{$this->cfg['pid_path']}/{$this->cfg['server_name']}.{$this->cfg['port']}.pid"; $this->lockFile = run_single_instance($lockFilePath); //修改进程名 cli_set_process_title("{$this->cfg['php_bin_path']} -f {$_SERVER['argv'][0]} [{$this->cfg['server_name']}]"); //创建socket $this->fd = stream_socket_server("tcp://{$this->cfg['host']}:{$this->cfg['port']}", $errno, $error); if ($this->fd === false) { debug_log("stream_socket_server() error:#{$errno},{$error}", 'ERROR', LOG_NAME); exit(1); } $this->reading[(int)$this->fd] = $this->fd; debug_log("Server start at {$this->cfg['host']}:{$this->cfg['port']}", 'DEBUG', LOG_NAME); //设置信号回调 pcntl_signal(SIGTERM, [$this, 'sigHandle']); pcntl_signal(SIGUSR1, [$this, 'sigHandle']); pcntl_signal(SIGPIPE, SIG_IGN); } /** * 接收新连接 */ protected function acceptNewClient() { $fd = stream_socket_accept($this->fd); if (!$fd) { debug_log("stream_socket_accept() error", 'ERROR', LOG_NAME); return; } $cli = new Client($fd, $this->cfg); stream_set_blocking($fd, 0); $this->clients[(int)$fd] = $cli; $this->reading[(int)$fd] = $fd; $cli->onConnect(); } /** * 删除连接 */ public function deleteClient($fd) { $cli = isset($this->clients[(int)$fd]) ? $this->clients[(int)$fd] : null; if ($cli != null) { $cli->onClose(); } if (isset($this->writing[(int)$fd])) { unset($this->writing[(int)$fd]); } if (isset($this->reading[(int)$fd])) { unset($this->reading[(int)$fd]); } if (isset($this->clients[(int)$fd])) { unset($this->clients[(int)$fd]); } fclose($fd); } /** * 从指定连接读数据 * @param resource $fd 要读取的客户连接socket资源 */ protected function readClient($fd) { if (!isset($this->clients[(int)$fd])) { $this->deleteClient($fd); return; } $cli = $this->clients[(int)$fd]; $data = fread($fd, $this->cfg['max_read_size']); if ($data === false) { $this->deleteClient($fd); debug_log("socket_read({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME); } elseif ($data === '') { $this->deleteClient($fd); } else { $cli->onRead($data); } } /** * 写消息给指定的客户连接 * @param resource $fd 要写消息的客户连接socket资源 */ protected function writeClient($fd) { if (!isset($this->clients[(int)$fd])) { $this->deleteClient($fd); return; } $cli = $this->clients[(int)$fd]; $data = $cli->prepareWrite(); if ($data === false) { unset($this->writing[(int)$fd]); } else { $n = fwrite($fd, $data); if ($n === false) { $this->deleteClient($fd); debug_log("socket_write({$fd}) error, client:" . $cli->peer(), 'ERROR', LOG_NAME); } else { echo "{$fd} write:" . $data . "\n"; $cli->onWrite($n); } } } /** * 将指定的客户连接设置为需要写数据的状态 */ public function setClientWriting($fd) { $this->writing[(int)$fd] = $fd; } /** * 退出 */ public function quit() { debug_log("Server is quiting", 'WARNING', LOG_NAME); foreach ($this->clients as $cli) { $this->deleteClient($cli->fd()); } flock($this->lockFile, LOCK_UN); fclose($this->lockFile); exit(0); } /** * 重启 */ public function restart() { debug_log("Server is restarting", 'WARNING', LOG_NAME); foreach ($this->clients as $cli) { $this->deleteClient($cli->fd()); } fclose($this->fd); flock($this->lockFile, LOCK_UN); fclose($this->lockFile); pcntl_exec($this->cfg['php_bin_path'], $_SERVER['argv']); debug_log("Server restart failed", 'ERROR', LOG_NAME); exit(1); } /** * 开始服务 */ public function run() { while (true) { $rList = array_values($this->reading); //必须复制一份 $wList = array_values($this->writing); //必须复制一份 $eList = []; $n = 0; list($sec, $msec) = GET_TIMER()->getWaitTime(); declare (ticks = 1) { $n = stream_select($rList, $wList, $eList, $sec, $msec); } if ($n === false) { if ($this->quiting) { $this->quit(); } elseif ($this->restarting) { $this->restart(); } else { debug_log("stream_select() error", 'ERROR', LOG_NAME); exit(1); } } elseif ($n > 0) { foreach ($rList as $fd) { if ($fd == $this->fd) { $this->acceptNewClient(); } else { $this->readClient($fd); } } foreach ($wList as $fd) { $this->writeClient($fd); } } else { } //定时器的循环检测代码 GET_TIMER()->interval(); } } /** * 信号回调 */ public function sigHandle($signo) { switch ($signo) { //退出 case SIGTERM: $this->quiting = true; break; //重启 case SIGUSR1: $this->restarting = true; break; } } }