我在wokerman里新增了异步任务处理,能否提交合并

ncwsky

截图执行效果

增加了以下内容,参照的swoole

/** 异步任务task方法时触发
 * @var callable
 */
public $onTask = null;

/** 异步任务进程数 大于0时创建异步进程
 * @var int
 */
public $task_worker_num = 0;

/** 异步进程端口 不指定时以当主服务端口+100
 * @var int
 */
public $task_port = 0;

/** 异步任务进程
 * @var Worker
 */
public $taskWorker = null;

/** 不设置默认使用 onWorkerStart
 * @var callable
 */
public $onTaskWorkerStart = null;

/**
 * @var int
 */
public $port = null;

/** 异步任务
 * @param mixed $data
 * @return bool|int  失败false 成功 返回任务进程id
 */
public function task($data){}
1605 3 0
3个回答

blogdaren

这属于业务类,估计官方不会合并进来,主要是保持workerman内核的精简性。

  • ncwsky 2020-03-21

    看了文档有如何实现异步任务的,就是觉得麻烦了点,所以就动手集成进去方便使用。

  • blogdaren 2020-03-21

    @6873: 刚看了下任务源码的抽象实现,个人感觉你这想法挺好的。//不过为啥你把127.0.0.1给写死了呢,这样分布式就不行了。

  • ncwsky 2020-03-21

    这个异步任务就是需要和主服务一起使用的 所以用的127.0.0.1 本来是想用unix来的 结果测试发现只能开启一个进程 不是很明白原理 所以就换成tcp方式了 至于要把异步任务用于分布式的话 就应该参考文档里的异步任务实现方式了

ncwsky

异步任务集成workerman实现

修改 __construct

    // Context for socket.
    if ($socket_name) {
        $this->_socketName = $socket_name;
        if (!isset($context_option['socket']['backlog'])) {
            $context_option['socket']['backlog'] = static::DEFAULT_BACKLOG;
        }
        $this->_context = \stream_context_create($context_option);

//这是增加的代码 用于获取port
list($scheme, $address) = explode(':', $this->_socketName, 2);
if ($scheme != 'unix') {
list(, $port) = explode(':', $address);
$this->port = (int)$port;
}
//这是增加的代码 end
}

增加以下属性
/** 异步任务task方法时触发

  • @var callable
    */
    public $onTask = null;

    /** 异步任务进程数 大于0时创建异步进程

  • @var int
    */
    public $task_worker_num = 0;

    /** 异步进程端口 不指定时以当主服务端口+100

  • @var int
    */
    public $task_port = 0;

    /** 异步任务进程

  • @var Worker
    */
    public $taskWorker = null;

    /** 不设置默认使用 onWorkerStart

  • @var callable
    */
    public $onTaskWorkerStart = null;

    /**

  • @var int
    */
    public $port = null;

此段代码放置到 protected static function init() 方法内头部

    //init task
    foreach (static::$_workers as $worker) {
        if ($worker->task_worker_num > 0) {
            if (!$worker->task_port) {
                $worker->task_port = $worker->port + 100;
            }
            if ($worker->task_port) {
                $taskWorker = new self('frame://127.0.0.1:' . $worker->task_port);
                $taskWorker->port = $worker->task_port;
                $taskWorker->name = $worker->name . '_task';
                $taskWorker->count = $worker->task_worker_num;
                //初始进程事件绑定
                $taskWorker->onWorkerStart = $worker->onTaskWorkerStart ? $worker->onTaskWorkerStart : $worker->onWorkerStart;
                $taskWorker->onWorkerReload = $worker->onWorkerReload;
                //当客户端的连接上发生错误时触发
                $taskWorker->onError = $worker->onError;
                $taskWorker->onConnect = function (TcpConnection $connection) use ($taskWorker) {
                    $connection->send($taskWorker->id); //返回进程id
                };
                $taskWorker->onMessage = function ($connection, $data) use ($taskWorker, $worker) {
                    $data = unserialize($data);
                    $ret = null;
                    if ($worker->onTask) {
                        call_user_func($worker->onTask, $taskWorker->id, $worker->id, $data);
                    }
                };
                $worker->taskWorker = $taskWorker;
            }
        }

增加异步执行方法 使用了wokerman自带的异步tcp连接就没法得到task_id了 所以才用了下面直接连接的方式
/** 异步任务

  • @param mixed $data
  • @return bool|int 失败false 成功 返回任务进程id
    */
    public function task($data){
    $fp = stream_socket_client("tcp://127.0.0.1:".$this->task_port, $errno, $errstr, 1);
    if (!$fp) {
    //echo "$errstr ($errno)",PHP_EOL;
    static::log("$errstr ($errno)");
    return false;
    } else {
    $taskId = (int)substr(fread($fp, 10),4);
    $send_data = serialize($data);
    $len = strlen($send_data)+4;
    $send_data = pack('N', $len) . $send_data;
    if(!fwrite($fp, $send_data, $len)){
    $taskId = false;
    }
    fclose($fp);
    return $taskId;
    }
    }

以上是直接在Worker.php修改代码

也可以继承Worker单独集成

  • blogdaren 2020-03-21

    你这task()方法的实现依然是同步的写法,而workerman自带的AsyncTcpconnection则是异步客户端实现,而且支持多种应用协议,另外onMessage回调里就能拿到task_id;

  • ncwsky 2020-03-21

    仅投递任务是同步的 通过返回task_id就可以判断是否投递成功了 如果不需要拿task_id 就可以用AsyncTcpconnection ; AsyncTcpconnection 方式我拿不到task_id的【就是执行任务的worker_id】 因为我要通过task()方法返回这个task_id的

walkor

感谢 @ncwsky 的提交。
正如 @blogdaren 所说:
鉴于以下几点,这个提交不打算合并。
1、保持workerman的精简
2、这个提交基本上属于业务逻辑
3、本地task任务有局限性,不好做服务器间的任务投递与集群
4、官网手册有提供更灵活的异步任务做法,可以跨服务器投递,可以做集群,还可以自定义很多细节如指定投递协议。

  • 暂无评论
年代过于久远,无法发表回答
🔝