自定义文本协议,服务端丢消息

poisonbian

自定义的文本格式协议,格式很简单,用\r\n\r\n来分隔数据。代码见最下方。

现在的情况是这样子的,我通过tcp客户端发起一次请求,传输5个文本包,服务端的日志来看,这5个包确实都接收到了(protocol的input中的日志被打印出来了),但是并没有触发onMessage的操作。

进一步定位,protocol中的decode函数压根都没有被调用,下面输出的内容中可以看到,5个包都正常收到了,解析length也正确,就是没有触发decode操作。。。

++++++++++++++++++++++++++++ 输出 +++++++++++++++++++++++++++

----------------------- WORKERMAN -----------------------------
Workerman version:3.3.1          PHP version:5.5.25
------------------------ WORKERS -------------------------------
user          worker            listen                  processes status
root          workerman_logger  orzText://0.0.0.0:8300   4          
----------------------------------------------------------------
Press Ctrl-C to quit. Start success.
string(11) "length: 223"
string(9) "decode..."
string(12) "onMessage..."
string(11) "length: 223"
string(9) "decode..."
string(12) "onMessage..."
string(11) "length: 223"
string(11) "length: 223"
string(11) "length: 223"

++++++++++++++++++++++++++++ 代码 +++++++++++++++++++++++++++

<?php
namespace Workerman\Protocols;

use Workerman\Connection\TcpConnection;

class OrzText
{
    public static $SEPARATOR = "\r\n\r\n";
    public static $SEPARATOR_LENGTH = 4;

    public static function input($buffer, TcpConnection $connection)
    {
        if (strlen($buffer) >= TcpConnection::$maxPackageSize) {
            $connection->close();
            return 0;
        }
        $pos = strpos($buffer, self::$SEPARATOR);
        if ($pos === false) {
            return 0;
        }
        $length = $pos + self::$SEPARATOR_LENGTH;
        var_dump("length: " . $length);
        return $length;
    }

    public static function encode($buffer)
    {
        return $buffer . self::$SEPARATOR;
    }

    public static function decode($buffer)
    {
        var_dump("decode...");
        return trim($buffer);
    }
}
3838 4 0
4个回答

walkor

贴完整代码。

  • 暂无评论
poisonbian

服务端代码

require_once(dirname(__FILE__) . '/CommandBaseManager.class.php');
Vendor('Workerman.Autoloader', '', '.php');
Vendor('GatewayWorker.Lib.Db', '', '.php');

use \Workerman\Worker;
use \GatewayWorker\Lib\Db;

abstract class WorkermanBaseManager extends CommandBaseManager
{
    protected static $config = null;
    protected static $dbs = array();
    protected static $db = null;

    public function _initialize()
    {
        parent::_initialize();

        Db::set_config(0, array(
            'host'  => C('DB_HOST'),
            'port'  => C('DB_PORT'),
            'user'  => C('DB_USER'),
            'password'  => C('DB_PWD'),
            'dbname'    => C('DB_NAME'),
        ));
        self::$dbs = Db::instance(0);
        self::$db = self::$dbs;
    }

    public function onWorkerStart($task)
    {
        $this->_logger()->debug('name: ' . $task->name);
        return true;
    }

    public function onConnect($connection)
    {
        $this->_logger()->debug('from ip ' . $connection->getRemoteIp());
        return true;
    }

    public function onMessage($connection, $data)
    {
        static $i = 0;
        var_dump("onMessage..." . (++$i));
    }

    public function onClose($connection)
    {
        $this->_logger()->debug('from ip ' . $connection->getRemoteIp());
        return true;
    }

    protected function _work()
    {
        $config = array(
            'listen'        => 'OrzText://0.0.0.0:8300',
            'name'          => 'workerman_logger',
            'count'         => 4,
            'logFile'       => 'Log/Command/logger_run.log',
        );

        $worker = new Worker(self::$config);
        Worker::$logFile = REAL_ROOT_PATH . self::$config;
        $worker->count = self::$config;
        $worker->name = self::$config;

        $worker->onMessage = array($this, 'onMessage');
        $worker->onConnect = array($this, 'onConnect');
        $worker->onClose = array($this, 'onClose');
        $worker->onWorkerStart = array($this, 'onWorkerStart');
        $worker->onWorkStop = array($this, 'onWorkerStop');
        $worker->onBufferFull = array($this, 'onBufferFull');
        $worker->onBufferDrain = array($this, 'onBufferDrain');
        $worker->onError = array($this, 'onError');

        Worker::runAll();
    }

    // 一般不需要做修改的一些回调
    public function onBufferFull($connection)
    {
        $this->_logger()->warn('do not send again');
    }
    public function onBufferDrain($connection)
    {
        $this->_logger()->warn('send again');
    }
    public function onWorkerStop($worker)
    {
        $log = sprintf('worker stop: %s', $worker->id, $worker->name);
        $this->_logger()->debug($log);
    }

    public function onError($connection, $code, $msg)
    {
        $log = sprintf('from ip %s, error: ', $connection->getRemoteIp(), $code, $msg);
        $this->_logger()->error($log);
    }
}

客户端调用的代码

<?php 
$host = 'localhost';
$port = 8300;
$timeout = 1;
$eof = "\r\n\r\n";

$socket = fsockopen($host, $port, $errno, $errstr, $timeout);

for ($i = 0; $i < 20; $i++)
{
    fwrite($socket, 'testing' . $eof);
}

fclose($socket);
echo "done\n";
  • 暂无评论
poisonbian

在群里得到了各位的帮助,最后又多次调试代码,发现了原因:客户端连接之后发送数据,在服务端的onMessage中我用了$connection->send(消息),而当服务端send失败,就把connection destroy掉了,即使这个时候服务端已经收到了后面的数据,也不会再做处理了。

  • 暂无评论
walkor

是的,客户端瞬间发送多个消息就立刻关闭了连接,你的代码中服务端收到消息处理的同时向客户端send消息,但是客户端已经关闭,workerman 在send时候发现客户端连接关闭了,就释放了连接,同时也释放了连接缓冲区的数据,导致后面的消息被丢弃。

实际上客户端发送完数据应该等待服务端确认后才能关闭连接,不然数据无法确保能被服务端接收并处理。因为发送的数据可能在客户端的socket缓冲区,根本没发送到服务端,也可能在服务端的缓冲区,还没被处理。

解决方法是客户端发送完消息后要读取服务端的返回,确定服务端收到数据再关闭连接。

  • 暂无评论
年代过于久远,无法发表回答
🔝