Channel worker间通讯。

wuchuguang
new Server();//Channel的服务器
$worker1 = new Worker();
$worker1->onStart=function(){
   Client::on(xxx,function($msg){
       echo "这里不能打印";
   }
}

$worker2 = new Worker();
$worker2->onStart = function(){
   Client::publish(xxx,data);//这里publish。
}
Worker::runAll();

Client.php
onRemoteMessage 打印,没有打印到publish的内容
2个worker间题。
同个worker内正常。

3651 6 0
6个回答

wuchuguang

onWorkerStart

  • 暂无评论
wuchuguang

真实环境是:
worker2用来

$redis->subscribe(xxx,function($instrance, $channel, $msg){
   Client::publish($channel, $msg);
}

worker1用

Client::on($channel, function($msg){
 //  来处理。因为redis->subscribe是阻塞的,不能在worker1内用。worker1要处理其它业务的。不能阻塞
}
  • 暂无评论
walkor

不能在onWorkerStart里立刻publish,因为另外的进程可能还没有运行到onWorkerStart,
可能还没运行到Client::on(xxx, ...)

另外两个worker要在onWorkerStart里调用下Client::connect('channel_server_ip')下,

  • wuchuguang 2016-07-23

    1:我看Client::on和Client::publish内部实现都有先调用Client::connect();//我的Server,ip和port都是默认的,Client::connect的ip和port也是默认的。这看来去应该是正常的。
    2:不能在onWorkerStart里立刻$redis->publish( Client::publish),这个逻辑我这边业务是允许的。

  • wuchuguang 2016-07-23

    现在的情况是,worker2->onWorkerStart里$redis->subscribe('xx', function($msg){
    Client::publish(xxxx,$msg);//这里要给$worker1->onWorerStart里的Client::on(xxxx,这里处理);
    })

  • wuchuguang 2016-07-23

    问题在于$worker->Client::on并没有收到$worker2的Client::publish

  • wuchuguang 2016-07-23

    ¥worker2 的 $redis->subscribe这里是一样的收到redis->publish的新数据的。

  • wuchuguang 2016-07-23

    看Client::publish是收到打印了。但Server::这边是没收到。Client::onRemoteMessage这里也没收到Client::publish发来的数据。
    我请问一下。
    当onWorkerStart里使用了redis->subscribe,会不会影响 这个回调里的Client::publish 发送数据到Server

  • walkor 2016-07-23

    subscribe是阻塞的,会影响workerman的异步通讯

wuchuguang

onWorkStart不能redis-subscribe(会导致Client::publish发不出数据),有何办法解决这个问题吗?

  • 暂无评论
walkor

最简单的方法是不直接调用Channel/Client::publish,因为Channel/Client是异步非阻塞的,你的redis-subscribe会阻塞整个进程,导致Channel/Client无法处理异步数据。你可以把Channel/Client::publish部分代码抽离出来,手写代码用阻塞的方式发送。

你可以看下Client.php publish方法的代码,改为用阻塞的方式publish事件。
代码类似

<?php
function publish($events, $data)
{
    $client = stream_socket_client('tcp://channel_server_ip:channel_server_port');
    $buffer =  serialize(array('type' => 'publish', 'channels'=>(array)$events, 'data' => $data));
    // frame协议格式,头部4个字节是包的长度
    $all_buffer = pack('N', strlen($buffer)+4).$buffer;
    fwrite($client, $all_buffer);
}
blue1018

这个真好用,我也用上了

  • 暂无评论
年代过于久远,无法发表回答
🔝