我在wokerman里新增了异步任务处理,能否提交合并

0

截图执行效果


增加了以下内容,参照的swoole


/** 异步任务task方法时触发
* @var callable
*/
public $onTask = null;

/** 异步任务进程数 大于0时创建异步进程
* @var int
*/
public $task_worker_num = 0;

/** 异步进程端口 不指定时以当主服务端口+100
* @var int
*/
public $task_port = 0;

/** 异步任务进程
* @var Worker
*/
public $taskWorker = null;

/** 不设置默认使用 onWorkerStart
* @var callable
*/
public $onTaskWorkerStart = null;

/**
* @var int
*/
public $port = null;

/** 异步任务
* @param mixed $data
* @return bool|int 失败false 成功 返回任务进程id
*/
public function task($data){}
已邀请:

blogdaren - 常年游走于 Linux、PHP、C、VIM 之间【http://www.phpcreeper.com】

赞同来自:

这属于业务类,估计官方不会合并进来,主要是保持workerman内核的精简性。

ncwsky

赞同来自:

异步任务集成workerman实现


修改 __construct


    // Context for socket.
if ($socket_name) {
$this->_socketName = $socket_name;
if (!isset($context_option['socket']['backlog'])) {
$context_option['socket']['backlog'] = static::DEFAULT_BACKLOG;
}
$this->_context = \stream_context_create($context_option);

//这是增加的代码 用于获取port
list($scheme, $address) = explode(':', $this->_socketName, 2);
if ($scheme != 'unix') {
list(, $port) = explode(':', $address);
$this->port = (int)$port;
}
//这是增加的代码 end
}


增加以下属性
/** 异步任务task方法时触发




  • @var callable
    */
    public $onTask = null;


    /** 异步任务进程数 大于0时创建异步进程




  • @var int
    */
    public $task_worker_num = 0;


    /** 异步进程端口 不指定时以当主服务端口+100




  • @var int
    */
    public $task_port = 0;


    /** 异步任务进程




  • @var Worker
    */
    public $taskWorker = null;


    /** 不设置默认使用 onWorkerStart




  • @var callable
    */
    public $onTaskWorkerStart = null;


    /**



  • @var int
    */
    public $port = null;


此段代码放置到 protected static function init() 方法内头部


    //init task
foreach (static::$_workers as $worker) {
if ($worker->task_worker_num > 0) {
if (!$worker->task_port) {
$worker->task_port = $worker->port + 100;
}
if ($worker->task_port) {
$taskWorker = new self('frame://127.0.0.1:' . $worker->task_port);
$taskWorker->port = $worker->task_port;
$taskWorker->name = $worker->name . '_task';
$taskWorker->count = $worker->task_worker_num;
//初始进程事件绑定
$taskWorker->onWorkerStart = $worker->onTaskWorkerStart ? $worker->onTaskWorkerStart : $worker->onWorkerStart;
$taskWorker->onWorkerReload = $worker->onWorkerReload;
//当客户端的连接上发生错误时触发
$taskWorker->onError = $worker->onError;
$taskWorker->onConnect = function (TcpConnection $connection) use ($taskWorker) {
$connection->send($taskWorker->id); //返回进程id
};
$taskWorker->onMessage = function ($connection, $data) use ($taskWorker, $worker) {
$data = unserialize($data);
$ret = null;
if ($worker->onTask) {
call_user_func($worker->onTask, $taskWorker->id, $worker->id, $data);
}
};
$worker->taskWorker = $taskWorker;
}
}

增加异步执行方法 使用了wokerman自带的异步tcp连接就没法得到task_id了 所以才用了下面直接连接的方式
/** 异步任务



  • @param mixed $data

  • @return bool|int 失败false 成功 返回任务进程id
    */
    public function task($data){
    $fp = stream_socket_client("tcp://127.0.0.1:".$this->task_port, $errno, $errstr, 1);
    if (!$fp) {
    //echo "$errstr ($errno)",PHP_EOL;
    static::log("$errstr ($errno)");
    return false;
    } else {
    $taskId = (int)substr(fread($fp, 10),4);
    $send_data = serialize($data);
    $len = strlen($send_data)+4;
    $send_data = pack('N', $len) . $send_data;
    if(!fwrite($fp, $send_data, $len)){
    $taskId = false;
    }
    fclose($fp);
    return $taskId;
    }
    }


以上是直接在Worker.php修改代码


也可以继承Worker单独集成

walkor

赞同来自:

感谢 @ncwsky 的提交。
正如 @blogdaren 所说:
鉴于以下几点,这个提交不打算合并。
1、保持workerman的精简
2、这个提交基本上属于业务逻辑
3、本地task任务有局限性,不好做服务器间的任务投递与集群
4、官网手册有提供更灵活的异步任务做法,可以跨服务器投递,可以做集群,还可以自定义很多细节如指定投递协议。

要回复问题请先登录注册