我想把rabbitmq 数据 转发到worker上面 一直不行 一直在 consume() 方法执行不往下执行

0
require_once __DIR__ . '/../../vendor/autoload.php';
require_once __DIR__ . '/../common/common.php';
global $location_con;

$worker = new Worker();

$worker->count = 1;

$worker->onWorkerStart = function () {

dg_init('system', 'rabbitmq_data_sysnc', array());
global $location_con;
global $sting;
$string = '';
//建立连接
$location_con = new AsyncTcpConnection('ws://127.0.0.1:9395');
$location_con->onConnect = function ($connection) {
var_dump('onConnect ok');
//heartbeat($connection, 'rabbitmq_data', 0);
};
$location_con->connect();
$location_con->send('123123123123');
$conn_args = array(
'host' => '192.168.5.133',
'port' => '5672',
'login' => 'admin',
'password' => 'admin',
'vhost' => '/'
);
$e_name = 'S2C'; //交换机名
$q_name = 'S2C'; //队列名
$k_route = 'S2C'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_FANOUT); //direct类型
$ex->setFlags(AMQP_PASSIVE); //持久化
$ex->declareExchange();

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
$q->declareQueue();
$q->bind($e_name, $k_route);//绑定交换机与队列,并指定路由键

********下面这段代码不行

$q->consume(function($envelope, $queue) use($location_con){
echo "send data to 9037";
$location_con->send($envelope->getBody());
echo $string = $envelope->getBody();
});//自动ACK应答
*************上面代码不知道怎么写
$conn->disconnect();
//连接断开
$location_con->onClose = function ($connection) {
connect_close($connection, 'start_data_sync-walkthink');
};
};

Worker::runAll();
已邀请:

walkor

赞同来自: zhou2021

接收方开text协议,类似


$worker = new Worker('text://127.0.0.1:9395');
$worker->onMessage = function($con, $data){
$data = json_decode($data);
var_dump($data);
};

$q->consume(function($envelope, $queue) use($location_con){
$client = stream_socket_client('tcp://127.0.0.1:9395');
fwrite($client, json_encode($envelope->getBody())."\n");
echo $string = $envelope->getBody();
});

walkor

赞同来自:

consume里是一个死循环,一直循环消费队列的数据。因为代码一直运行在这个循环里,workerman永远无法得到控制权,就无法把数据发送出去。


你可以用stream_socket_client 替代 AsyncTcpConnection 。看到对端使用的websocket协议,stream_socket_client 不好直接连websocket端口,对端最好再开一个text端口,stream_socket_client以text协议发送数据。

要回复问题请先登录注册