参考文档中的异步任务代码,使用生产环境高频数据进行测试(关键字:Address not available)

flysheep

直接start运行,大约45秒(累计2W5+数据后)后抛出异常
Warning: stream_socket_client(): unable to connect to tcp://172.18.0.19:22345 (Address not available) in /var/www/html/vendor/workerman/workerman/Connection/AsyncTcpConnection.php on line 185

MQTT内数据为
{"code":"LI2701_LS_HH","type":"BOOL","value":false,"timestamp":"1617874140"}

服务端代码1

require_once __DIR__ . '/../../vendor/autoload.php';
use \Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
$worker = new Worker();
$worker->onWorkerStart = function(){
    global $db;
    $db = 0;
    $mqtt = new \Workerman\Mqtt\Client('mqtt://127.0.0.1:1883');
    $mqtt->onConnect = function($mqtt) {
        $mqtt->subscribe('simp/nodes/#');
    };
    $mqtt->onMessage = function($topic, $content){
        global $db;
        // 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
        $task_connection = new AsyncTcpConnection('Text://127.0.0.1:22345');
        // 任务及参数数据
        // $task_data = array(
        //     'function' => 'send_mail',
        //     'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
        // );
        // 发送数据
        $topic              =   explode("/", $topic);
        $data               =   json_decode($content,true);
        $data["count"]      =   $db;
        $db++;
        $task_connection->send(json_encode($data));
        // 异步获得结果
        $task_connection->onMessage = function($task_connection, $task_result)
        {
             // 结果
             // 获得结果后记得关闭异步连接

             $task_connection->close();
             // 通知对应的websocket客户端任务完成

        };
        // 执行异步连接
        $task_connection->connect();
    };
    $mqtt->connect();
};

服务端代码2

require_once __DIR__ . '/../../vendor/autoload.php';
use Workerman\Worker;

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:22345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function($connection, $task_data)
{
     // 假设发来的是json数据
     $data = json_decode($task_data, true);
     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....

     echo $data["count"]."\n";
     $task_result = "ok";
     // 发送结果
     $connection->send($task_result);
};

异常时执行status

2343 1 0
1个回答

walkor

每次建立一个连接客户端侧都会消耗一个端口,连接断开后端口不会立刻释放,会进入time_wait等待一段事件。如果不断的快速创建连接本地端口可能因此消耗光,进而无法发起新的连接。

如果你的异步任务真的有这么大量,可以考虑复用连接,类似连接池,建立多个连接保存在全局数组里。每次选择一个连接发送数据。

  • 暂无评论
年代过于久远,无法发表回答
🔝