swoole协程通信之数据处理协程对数据进行读写,保存到协程上下文示例
swoole  /  管理员 发布于 9个月前   272
看文档说协程之间通信可以用通道实现,但是都只是说可以用通道推数据和读数据,
没有提到怎么同步保存数据。下面实现了一下使用一个数据处理协程对数据进行读写,保存到协程上下文的示例;
示例代码:
<?php
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function() {
//写通道,供生产者写入数据
$channelWrite = new Coroutine\Channel(1);
//读通道,供消费者读取数据
$channelRead = new Coroutine\Channel(1);
//数据处理协程
go(function () use ($channelWrite, $channelRead) {
//用这个协程的上下文存储数据
$pcid =Coroutine::getCid();
$context = Coroutine::getContext($pcid);
$context['count'] = [];
go(function() use ($channelWrite, $pcid) {
//用父协程id获取上下文,直接传数据数据修改不会同步
$context = Coroutine::getContext($pcid);
//从写通道读取数据,放入父协程上下文保存
while(true) {
$data = $channelWrite->pop();
if (!$data) {
break;
}
$context['count'] = $data;
}
});
go(function() use ($channelRead, $pcid) {
//用父协程id获取上下文,直接传数据数据修改不会同步
$context = Coroutine::getContext($pcid);
//从父协程上下文获取实时数据,传入读通道,供消费协程使用
while(true) {
$res = $channelRead->push($context['count']);
if (!$res) {
break;
}
}
});
});
// 多个消费者协程和数据生产协程
go (function() use ($channelRead, $channelWrite) {
for ($i = 0; $i < 100; $i++) {
Coroutine::sleep(1);
if ($i % 2 == 0) {
$data = $channelRead->pop();
echo '读数据:', PHP_EOL;
print_r($data);
} else {
$channelWrite->push(array_fill(0, 5, $i));
echo '写入数据:'.$i, PHP_EOL;
}
}
$channelRead->close();
$channelWrite->close();
echo '关闭通道';
});
});
还需要对读写协程上下文做一个读写锁,应该就可以保证数据一致性
<?php
use Swoole\Coroutine;
use Swoole\Lock;
use Swoole\Timer;
use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
run(function() {
$channelWrite = new Coroutine\Channel(1);
$channelRead = new Coroutine\Channel(1);
//数据处理协程
go(function () use ($channelWrite, $channelRead) {
//用这个协程的上下文存储数据
$pcid =Coroutine::getCid();
$context = Coroutine::getContext($pcid);
$context['count'] = 0;
//读写锁
$lock = new Lock(SWOOLE_RWLOCK);
go(function() use ($channelWrite, $pcid, $lock) {
//用父协程id获取上下文,直接传数据数据不一致
$context = Coroutine::getContext($pcid);
//从写通道读取数据,放入父协程上下文保存
while(true) {
$data = $channelWrite->pop();
if (!$data) {
break;
}
// echo '写入数据(等写锁)', PHP_EOL;
$lock->lock();
$context['count'] = $data;
// echo '写入数据', PHP_EOL;
$lock->unlock();
}
});
go(function() use ($channelRead, $pcid, $lock) {
//用父协程id获取上下文,直接传数据数据不一致
$context = Coroutine::getContext($pcid);
//从父协程上下文获取实时数据,传入读通道,供消费协程使用
while(true) {
// echo '读取数据(等读锁)', PHP_EOL;
$lock->lock_read();
$data = $context['count'];
// echo '读取数据完成', PHP_EOL;
$lock->unlock();
$res = $channelRead->push($data);
if (!$res) {
break;
}
}
});
});
// 多个消费者协程和数据生产协程
go (function() use ($channelRead, $channelWrite) {
Timer::tick(1000, function() use ($channelRead) {
$data = $channelRead->pop();
echo date('[i:s]') . '读数据:' . $data, PHP_EOL;
});
for ($i = 1; $i < 10; $i++) {
Coroutine::sleep(1.5);
$channelWrite->push($i);
echo date('[i:s]') . '写入数据:'.$i, PHP_EOL;
}
});
});
测试输出:
[00:11]读数据:0
[00:11]写入数据:1
[00:12]读数据:0
[00:13]读数据:0
[00:13]写入数据:2
[00:14]读数据:1
[00:14]写入数据:3
[00:15]读数据:1
[00:16]读数据:2
[00:16]写入数据:4
[00:17]读数据:3
[00:17]写入数据:5
[00:18]读数据:3
[00:19]读数据:4
[00:19]写入数据:6
[00:20]读数据:5
[00:20]写入数据:7
[00:21]读数据:5
[00:22]读数据:6
[00:22]写入数据:8
[00:23]读数据:7
[00:23]写入数据:9
[00:24]读数据:7
[00:25]读数据:8
[00:26]读数据:9
[00:27]读数据:9
[00:28]读数据:9
[00:29]读数据:9
[00:30]读数据:9
[00:31]读数据:9
[00:32]读数据:9
完,有兴趣可自行测试!
123 在
Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..原梓番博客 在
在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..博主 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..1111 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..路人 在
php中使用hyperf框架调用讯飞星火大模型实现国内版chatgpt功能示例中评论 教程很详细,如果加个前端chatgpt对话页面就完美了..Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号