请教一个socket长连接相关的问题

0

walkor大神,请教一个socket相关的问题


现在情况是这样的,我要写一个将MySQL数据同步到ES的服务,方案是将MySQL binlog日志解析成结构化的数据,然后写入ES,解析binlog的是一个php cli 单进程,死循环获取binlog数据,因为怕数据太多,消费能力跟不上,想着用workerman多个worker进程处理,但是多个worker进程存在的问题是有序性问题,同一张表的事件只能同时由一个worker进程处理,我目前的想法是缓存了几个缓存了几个socket链接,然后根据表名去走对应的socket链接发送消息,不确定这样子稳不稳定。代码如下面,写了一点点,大概意思能表现出来。
方案和代码参考了这个问题 https://wenda.workerman.net/question/508


有没有什么更好的方案呢?


消费解析后的binlog worker进程


use app\dbBase;
use Workerman\Worker;
require_once __DIR__ . '/../vendor/autoload.php';

// 创建一个Worker监听2347端口,不使用任何应用层协议
$worker = new Worker("text://0.0.0.0:2347");

// 启动4个进程对外提供服务
$worker->count = 6;
$worker->name = 'write_es';
Worker::$logFile = __DIR__ . '/' . $worker->name . '.log';

$worker->onWorkerStart = function($worker)
{
// 将db实例存储在全局变量中(也可以存储在某类的静态成员中)
dbBase::getInstance()->init();
};

// 当客户端发来数据时
$worker->onMessage = function($connection, $data)
{
echo $data.PHP_EOL;
//将mysql数据写入ES
// 向客户端发送hello $data
$connection->send('hello ' . $data."\n");
};

$worker->onConnect = function ($connection) {
$connection->send('hello\n');
};

// 运行worker
Worker::runAll();

解析binlog后推送到worker进程


class mysqlEventSubscribers extends EventSubscribers
{

const client_count = 4;
private static $clients;

/**
* mysql增删改查事件
* @param EventDTO $event
*/
public function allEvents(EventDTO $event): void {
// all events got __toString() implementation
echo $event;
// all events got JsonSerializable implementation
//echo json_encode($event, JSON_PRETTY_PRINT);
//将事件推送到worker进程中进行处理
$this->send($event);

echo 'Memory usage ' . round(memory_get_usage() / 1048576, 2) . ' MB' . PHP_EOL;
}

function send($event) {
if (!isset($this->clients)) {
// 建立socket连接到内部推送端口
for ($i = 0; $i<self::client_count;$i++) {
static::$clients[$i] = stream_socket_client('tcp://127.0.0.1:2347', $errno, $errmsg, 1);
}
}

//根据event事件中的数据库表名,找到对应的
$client = $this->getClientByEvent($event);

// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($event) . "\n");
// 读取推送结果
echo fread($client, 8192);
}

function getTableFromEvent($event) {
//逻辑没实现,大概就是不同的增删改查返回不同的表名
return 'table';
}

function getClientByEvent($event) {
$table = $this->getTableFromEvent($event);
$clientIndex = syncTable::$tables[$table] % self::client_count;
return static::$clients[$clientIndex];
}
}
已邀请:

敖德萨

赞同来自:

我怎么感觉你这有点像要写定时任务的样子 就是当有消息过多的时候我没办法同步到es的时候就用缓存来解压一下 直至消息同步为此是不是?

foolgry

赞同来自:

我测试了下,上面的方案是行不通的,因为同一个client发送的多个消息,并不是同一个worker进程处理的,所以上面我的方案不行,应该还是要在发送之前加个数组缓存event消息,同一个表的多条数据,等一条处理完了再发送下一条

six

赞同来自:

同一个连接的数据肯定是同一个worker处理的,所以感觉代码没啥问题。

要回复问题请先登录注册