GatewayWorker服务开启后,TIME_WAIT立马飚满,关掉后降低,麻烦看一下是不是代码写的有问题

0

GatewayWorker开启后,TIME_WAIT到50000(设置的最大值)

服务停止几秒后TIME_WAIT降下来.


下面是我的events.php里的代码


<?php
/**
* This file is part of workerman.
*
* Licensed under The MIT License
* For full copyright and license information, please see the MIT-LICENSE.txt
* Redistributions of files must retain the above copyright notice.
*
* @author walkor<walkor@workerman.net>
* @copyright walkor<walkor@workerman.net>
* @link http://www.workerman.net/
* @license http://www.opensource.org/licenses/mit-license.php MIT License
*/

/**
* 用于检测业务代码死循环或者长时间阻塞等问题
* 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
* 然后观察一段时间workerman.log看是否有process_timeout异常
*/
//declare(ticks=1);

use \GatewayWorker\Lib\Gateway;
use Workerman\Lib\Timer;
/**
* 主逻辑
* 主要是处理 onConnect onMessage onClose 三个方法
* onConnect 和 onClose 如果不需要可以不用实现并删除
*/
class Events
{
static $MY_DOMAIN = 'xxx.com';
static $MAIN_CODE = 'xxxxx'; //最高权限uid
static $redis;
public static function onWorkerStart($worker){
//初始化redis
$redis = new Redis();
$redis->connect('xxxx', 6379);
self::$redis = $redis;
//将定时任务平均分配给各个进程
$id = $worker->id;
//1秒一次
Timer::add(1, function() use($id) {
self::crontab($id, 1);
});
//1秒2次
Timer::add(0.5, function() use($id) {
self::crontab($id, 2);
});
//1分钟1次
Timer::add(60, function() use($id) {
self::crontab($id, 3);
});
//30秒1次
Timer::add(30, function() use($id) {
self::crontab($id, 4);
});
//查看在线状态
Timer::add(1, function() {
//查询所有定时任务
$allKey = self::$redis->hKeys('HFiots');
//推送
foreach($allKey as $k => $v)
{
self::$redis->hSet('HFiots-online', $v, Gateway::isUidOnline($v . '-Uid'));
}
});
}
/**
* 当客户端连接时触发
* 如果业务不需此回调可以删除onConnect
*
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{

}

/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
//TCP协议 websocket
if($_SERVER['GATEWAY_PORT'] == '8282' || $_SERVER['GATEWAY_PORT'] == '8181' || $_SERVER['GATEWAY_PORT'] == '8383')
{
//首先判断该clientId是否第一次链接
$uid = Gateway::getUidByClientId($client_id);
if(!$uid) //第一次连接
{
//如果是MAIN_CODE,那么给他绑定uid
if($message == self::$MAIN_CODE)
{
Gateway::bindUid($client_id, self::$MAIN_CODE . '-Uid');
Gateway::sendToClient($client_id, 'connet success');
}
else
{
if(self::$redis->hExists('HFiots', $message))
{
$my = json_decode(self::$redis->hGet('HFiots', $message), true);
//单点登录还是多点登录
$myUid = $my['uid'];
if($my['login'] == 0) //单点登录
{
//查询所有客户端,先把其他的连接踢掉
$allClientId = Gateway::getClientIdByUid($myUid);
if($allClientId)
{
foreach($allClientId as $k => $v)
{
Gateway::closeClient($v);
}
}
}
//绑定uid,上线
Gateway::bindUid($client_id, $myUid);
//存个session
$_SESSION[$client_id] = $myUid;
//是否有自定义回复包,如果有的话,回复
$myRecode = $my['recode'];
if($myRecode)
{
if($my['rtype'] == 0) //ASCII类型
{
Gateway::sendToClient($client_id, $myRecode);
}
if($my['rtype'] == 1) //HEX类型
{
Gateway::sendToClient($client_id, hex2bin($myRecode));
}
}
}
}
}
else
{
if($_SERVER['GATEWAY_PORT'] == '8282') //TCP协议
{
//查询该链接的UID
$myStr = '';
$myUid = Gateway::getUidByClientId($client_id);
$myKey = str_replace('-Uid', '', $myUid);
$my = json_decode(self::$redis->hGet('HFiots', $myKey), true);
//存储数据
if($my['vtype'] == 1) //HEX类型
{
$myStr = str_replace(' ', '', bin2hex($message));
}
else
{
$myStr = str_replace(' ', '', $message);
}
//数据过滤
$ok = self::checkFilter($myKey, $myStr);
if($ok)
{
if($my['val']) //需要存储
{
$myVal = explode(':', $my['val']);
if(count($myVal) == 1) //string类型
{
self::$redis->set($my['val'], $myStr);
//记录时间
self::$redis->set($my['val'] . '-time', date('Y-m-d H:i:s'));
}
if(count($myVal) == 2) //list类型
{
$myArr = json_decode($myStr, true);
if($myArr)
{
foreach($myArr as $k => $v)
{
$childArr = json_decode($v, true);
if($childArr)
{
$myArr[$k] = $childArr;
}
}
}
$myArr = $myArr ? $myArr : $myStr;
$arr = array('val' => $myArr, 'time' => date('Y-m-d H:i:s'));
self::$redis->rpush($myVal[1], json_encode($arr, JSON_UNESCAPED_UNICODE));
}
}
//客户端需转发
$forward = explode(',', $my['forward']);
$forward[] = self::$MAIN_CODE; //web前端链接
//获取对应的clientId
foreach($forward as $k => $v)
{
$allClientId = Gateway::getClientIdByUid($v . '-Uid');
foreach($allClientId as $key => $val)
{
$arr = array('k' => $myKey, 'v' => $myStr, 't' => date('Y-m-d H:i:s'));
Gateway::sendToClient($val, json_encode($arr, JSON_UNESCAPED_UNICODE));
}
}
}
}
//被动回复
$directive = json_decode(self::$redis->hGet('HFiots-directive', $myKey), true);
if(count($directive))
{
foreach($directive as $k => $v)
{
$dirStr1 = '';
//触发指令
if($v['stype'] == 1) //HEX类型
{
$dirStr1 = strtoupper(str_replace(' ', '', bin2hex($message)));
}
else
{
$dirStr1 = strtoupper(str_replace(' ', '', $message));
}
//如果收到的消息==触发指令
if($dirStr1 == strtoupper(str_replace(' ', '', $v['sval'])))
{
//回复指令
if($v['rtype'] == 1) //HEX类型
{
Gateway::sendToClient($client_id, hex2bin($v['rval']));
}
else
{
Gateway::sendToClient($client_id, $r['rval']);
}
break;
}
}
}
if($_SERVER['GATEWAY_PORT'] == '8181' || $_SERVER['GATEWAY_PORT'] == '8383') //webSocket协议
{
$message = json_decode($message, true);
$val = str_replace(' ', '', $message['val']);
if(strtoupper($message['type']) == '1') //HEX类型
{
$val = hex2bin($val);
}
if(strtoupper($message['type']) == '2') //GB2312
{
$val = iconv("UTF-8", "gb2312//IGNORE", $val);
}
$allClientId = Gateway::getClientIdByUid($message['to'] . '-Uid');
foreach($allClientId as $k => $v)
{
Gateway::sendToClient($v, $val);
}
}
}
}
}
public static function checkFilter($myKey, $message){
//是否需要过滤
$filter = json_decode(self::$redis->hGet('HFiots-filter', $myKey), true);
$ok = 1; //1通过过滤,0未通过
if($filter['is'] == 1) //需要过滤
{
if(in_array(0, $filter['type'])) //字符长度
{
switch ($filter['lengType']) {
case '1': // >
if(strlen($message) <= $filter['length'])
{
$ok = 0;
}
break;

case '2': // <
if(strlen($message) >= $filter['length'])
{
$ok = 0;
}
break;

case '3': // =
if(strlen($message) != $filter['length'])
{
$ok = 0;
}
break;

case '4': // ≥
if(strlen($message) < $filter['length'])
{
$ok = 0;
}
break;

case '5': // ≤
if(strlen($message) > $filter['length'])
{
$ok = 0;
}
break;

default:
# code...
break;
}
}
if(in_array(1, $filter['type'])) //前N位
{
if(!in_array(substr($message, 0, $filter['before']), explode(',', $filter['beforeVal'])))
{
$ok = 0;
}
}
if(in_array(2, $filter['type'])) //忽略心跳包
{
if(in_array($message, explode(',', $filter['heartVal'])))
{
$ok = 0;
}
}
}
return $ok;
}
/**
* 当用户断开连接时触发
* @param int $client_id 连接id
*/
public static function onClose($client_id)
{
}
public static function onWebSocketConnect($client_id, $data)
{
}
//$id:进程id,$times:crontab里的times类别id
public static function crontab($id, $times){
//查询所有定时任务
$allKey = self::$redis->hKeys('HFiots-crontab');
//推送
foreach($allKey as $k => $v)
{
//将定时任务平均分配到各个进程
if(($k % 8) == $id)
{
//定时任务是否存在
$crontab = json_decode(self::$redis->hGet('HFiots-crontab', $v), true);
$iot = json_decode(self::$redis->hGet('HFiots', $v), true);

if($crontab['val'] && $crontab['status'] == 1 && $crontab['times'] == $times)
{
//获取当前连接clientId集合
$allClientId = Gateway::getClientIdByUid($v . '-Uid');
foreach($allClientId as $key => $val)
{
//数据来源,自定义数据类型,指令是按顺序依次发送,防止指令频繁发送导致设备无法响应
if($crontab['type'] == 0) //自定义数据
{
$rval = explode(',', $crontab['val']);
if($rval)
{
$rkey = 0; //当前应该发送的排序
if(self::$redis->exists('' . $v))
{
$rkey = self::$redis->get('' . $v);
if($rkey > count($rval) - 1)
{
$rkey = 0;
}
}
$va = $crontab['vtype'] == 0 ? $rval[$rkey] : hex2bin($rval[$rkey]);
Gateway::sendToClient($val, $va);
$rkey += 1;
self::$redis->set('' . $v, $rkey);
}
}

if($crontab['type'] == 1) //redis队列
{
//内容
$myVal = explode(',', $crontab['val']);
if($myVal)
{
////查看本连接是TCP还是websocket,如果TCP,那么依次透传.如果是websocket,那么组成数组后传输.
//发送的内容
$res = array();
foreach($myVal as $ke => $va)
{
$myRes = '';
$nva = explode(':', $va); //根据:分割数组
if(count($nva) == 1) //string类型
{
$myRes = self::$redis->get($nva[0]);
}
if(count($nva) == 2) //list类型
{
$myRes = self::$redis->hGet($nva[0], $nva[1]);
}
if($crontab['vtype'] == 1) //HEX类型
{
$myRes = hex2bin($myRes);
}
if($iot['type'] == 0) //TCP
{
Gateway::sendToClient($val, $myRes);
}
if($iot['type'] == 1) //websocket
{
$res[] = array($va => $myRes);
}
}
if($iot['type'] == 1) //websocket
{
$arr = array('k' => 'crontab', 'v' => $res, 't' => date('Y-m-d H:i:s'));
Gateway::sendToClient($val, json_encode($arr, JSON_UNESCAPED_UNICODE));
}
}
}
}
}
}
}
}
}
已邀请:

xiuwang

赞同来自:

我看过源码,gateway有的接口调用是用的短链接,短链接会占用一个本地端口,连接释放后这个端口进入短暂time_wait。如果time_wait达到几万,猜测该是你调用gateway接口太频繁了。不知道linux有没有快速回收或者重复利用time_wait的方法,实在不行就得改源码了。


https://github.com/walkor/GatewayWorker/blob/master/src/Lib/Gateway.php
里面所有 stream_socket_client 相关调用第5个参数传 STREAM_CLIENT_PERSISTENT | STREAM_CLIENT_CONNECT 试下

小七他哥 - 80后IT男

赞同来自:

你这样的写法,不用担心 redis 链接用了一段时间关闭了的问题吗?

要回复问题请先登录注册