id = ++self::$currentId; $this->fd = $fd; $this->cfg = $cfg; $this->readBuffer = ''; $this->writeQueue = []; $this->peer = stream_socket_get_name($this->fd, true); list($this->host, $this->port) = explode(':', $this->peer); } /** * 返回客户连接的Socket资源 * @return resource Socket资源 */ public function fd() { return $this->fd; } /** * 返回客户连接的远程名称 * @return string 名称(主机地址:端口号) */ public function peer() { return $this->peer; } /** * 连接成功后的回调 */ public function onConnect() { } /** * 连接断开前的回调 */ public function onClose() { } /** * 读取消息后的回调 * @param string $data 读到的数据 */ public function onRead($data) { echo $this->id . " read:" . $data . "\n"; $this->readBuffer .= $data; while (!empty($this->readBuffer)) { $msg = Message::parse($this->readBuffer); if ($msg) { $this->onRequest($msg); } else { break; } } } /** * 写数据完成后的回调 * @param int $n 写了多少数据 */ public function onWrite($n) { if (empty($this->writeQueue)) { return false; } $this->writeQueue[0][1] += $n; if ($this->writeQueue[0][1] >= $this->writeQueue[0][2]) { $first = array_shift($this->writeQueue); if ($first[3] != null) { call_user_func($first[3]); } } } /** * 从当前的发送消息队列中返回头一条消息的数据 * @return string 要发送的数据 */ public function prepareWrite() { if (empty($this->writeQueue)) { return false; } $first = $this->writeQueue[0]; return substr($first[0], $first[1], $this->cfg['max_write_size']); } /** * 关闭当前连接 */ public function close() { GET_SERVER()->deleteClient($this->fd); } /** * 向客户返回一条消息 * @param Message|string 要发送的消息或原始数据 */ public function response($msg, $cb=null) { $buffer = is_a($msg, 'Message') ? $msg->toBuffer() : strval($msg); $this->writeQueue[] = [$buffer, 0, strlen($buffer), $cb]; GET_SERVER()->setClientWriting($this->fd); } /** * 处理来自客户的请求 * @param Message $msg 消息 */ abstract public function onRequest($msg); }