侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

如何基于Hyperf实现RabbitMQ+WebSocket消息推送

swoole  /  管理员 发布于 7年前   311

介绍

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

<?phpreturn [    'mode' => SWOOLE_PROCESS,    'servers' => [        ['name' => 'http','type' => Server::SERVER_HTTP,'host' => '0.0.0.0','port' => 11111,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [    SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],],        ],        ['name' => 'ws','type' => Server::SERVER_WEBSOCKET,'host' => '0.0.0.0','port' => 12222,'sock_type' => SWOOLE_SOCK_TCP,'callbacks' => [    SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],    SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],    SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],],        ],    ],

WebSocket 服务器端代码示例

<?phpdeclare(strict_types=1);/** * This file is part of Hyperf. * * @link     https://www.hyperf.io * @document https://doc.hyperf.io * @contact  [email protected] * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */namespace App\Controller;use Hyperf\Contract\OnCloseInterface;use Hyperf\Contract\OnMessageInterface;use Hyperf\Contract\OnOpenInterface;use Swoole\Http\Request;use Swoole\Server;use Swoole\Websocket\Frame;use Swoole\WebSocket\Server as WebSocketServer;class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface{    /**     * 发送消息     * @param WebSocketServer $server     * @param Frame $frame     */    public function onMessage(WebSocketServer $server, Frame $frame): void    {        //心跳刷新缓存        $redis = $this->container->get(\Redis::class);        //获取所有的客户端id        $fdList = $redis->sMembers('websocket_sjd_1');        //如果当前客户端在客户端集合中,就刷新        if (in_array($frame->fd, $fdList)) {$redis->sAdd('websocket_sjd_1', $frame->fd);$redis->expire('websocket_sjd_1', 7200);        }        $server->push($frame->fd, 'Recv: ' . $frame->data);    }    /**     * 客户端失去链接     * @param Server $server     * @param int $fd     * @param int $reactorId     */    public function onClose(Server $server, int $fd, int $reactorId): void    {        //删掉客户端id        $redis = $this->container->get(\Redis::class);        //移除集合中指定的value        $redis->sRem('websocket_sjd_1', $fd);        var_dump('closed');    }    /**     * 客户端链接     * @param WebSocketServer $server     * @param Request $request     */    public function onOpen(WebSocketServer $server, Request $request): void    {        //保存客户端id        $redis = $this->container->get(\Redis::class);        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);        var_dump($res1);        $res = $redis->expire('websocket_sjd_1', 7200);        var_dump($res);        $server->push($request->fd, 'Opened');    }}

WebSocket 前端代码

    function WebSocketTest() {        if ("WebSocket" in window) {console.log("您的浏览器支持 WebSocket!");var num = 0// 打开一个 web socketvar ws = new WebSocket("ws://127.0.0.1:12222");ws.onopen = function () {    // Web Socket 已连接上,使用 send() 方法发送数据    //alert("数据发送中...");    //ws.send("发送数据");};window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开    var ping = {"type": "ping"};    ws.send(JSON.stringify(ping));}, 5000);ws.onmessage = function (evt) {    var d = JSON.parse(evt.data);    console.log(d);    if (d.code == 300) {        $(".address").text(d.address)    }    if (d.code == 200) {        var v = d.data        console.log(v);        num++        var str = `<div class="item"><p>${v.recordOutTime}</p><p>${v.userOutName}</p><p>${v.userOutNum}</p><p>${v.doorOutName}</p>        </div>`        $(".tableHead").after(str)        if (num > 7) {num--$(".table .item:nth-last-child(1)").remove()        }    }};ws.error = function (e) {    console.log(e)    alert(e)}ws.onclose = function () {    // 关闭 websocket    alert("连接已关闭...");};        } else {alert("您的浏览器不支持 WebSocket!");        }    }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?phpreturn [    'default' => [        'host' => 'localhost',        'port' => 5672,        'user' => 'guest',        'password' => 'guest',        'vhost' => '/',        'pool' => ['min_connections' => 1,'max_connections' => 10,'connect_timeout' => 10.0,'wait_timeout' => 3.0,'heartbeat' => -1,        ],        'params' => ['insist' => false,'login_method' => 'AMQPLAIN','login_response' => null,'locale' => 'en_US','connection_timeout' => 3.0,'read_write_timeout' => 6.0,'context' => null,'keepalive' => false,'heartbeat' => 3,        ],    ],];

MQ 消费者代码

<?phpdeclare(strict_types=1);namespace App\Amqp\Consumer;use Hyperf\Amqp\Annotation\Consumer;use Hyperf\Amqp\Message\ConsumerMessage;use Hyperf\Amqp\Result;use Hyperf\Server\Server;use Hyperf\Server\ServerFactory;/** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */class DemoConsumer extends ConsumerMessage{    /**     * rabbmitMQ消费端代码     * @param $data     * @return string     */    public function consume($data): string    {        print_r($data);        //获取集合中所有的value        $redis = $this->container->get(\Redis::class);        $fdList=$redis->sMembers('websocket_sjd_1');        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();        foreach($fdList as $key=>$v){if(!empty($v)){    $server->push((int)$v, $data);}        }        return Result::ACK;    }}

控制器代码

    /**     * test     * @return array     */    public function test()    {        $data = array('code' => 200,'data' => [    'userOutName' => 'ccflow',    'userOutNum' => '9999',    'recordOutTime' => date("Y-m-d H:i:s", time()),    'doorOutName' => '教师公寓',]        );        $data = \GuzzleHttp\json_encode($data);        $message = new DemoProducer($data);        $producer = ApplicationContext::getContainer()->get(Producer::class);        $result = $producer->produce($message);        var_dump($result);        $user = $this->request->input('user', 'Hyperf');        $method = $this->request->getMethod();        return ['method' => $method,'message' => "{$user}.",        ];    }

最终效果

ab7e49780093484c182c1baf0dbedce.png

推荐:《PHP教程》

以上就是如何基于Hyperf实现RabbitMQ+WebSocket消息推送的详细内容,更多请关注其它相关文章!


  • 上一条:
    PHP Swoole 基本使用
    下一条:
    PHP7 安装 Swoole 教程
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • swoole协程通信之数据处理协程对数据进行读写,保存到协程上下文示例(0个评论)
    • swoole redis连接池应用优化之多协程频繁访问redis(0个评论)
    • php中使用hyperf框架调用讯飞星火大模型实现国内版chatgpt功能示例(2个评论)
    • php中使用hyperf框架调用文心千帆大模型实现国内版chatgpt功能示例(0个评论)
    • 在hyperf框架中使用基于protobuf的RPC生成器实现rpc服务(1个评论)
    • 近期文章
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2017-09
    • 2020-03
    • 2020-06
    • 2021-03
    • 2021-04
    • 2021-05
    • 2021-07
    • 2021-08
    • 2021-09
    • 2021-11
    • 2022-03
    • 2022-05
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-07
    • 2023-08
    • 2023-11
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客