侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

在go语言中封装rabbitmq服务,实现正常、延时消息发送功能

Go  /  管理员 发布于 1年前   454

在Go语言中封装RabbitMQ可以提供更简洁、易用、可维护的API,提高开发效率,实现解耦和异步处理。


封装代码示例:

package rabbitmq
import (
    "fmt"
    "log"
    "strconv"
    "strings"
    "yoyo/pkg/config"
    "github.com/streadway/amqp"
)
// 消息体:DelayTime 仅在 SendDelayMessage 方法有效
type Message struct {
    DelayTime int // desc:延迟时间(秒)
    Body      string
}
type MessageQueue struct {
    conn         *amqp.Connection // amqp链接对象
    ch           *amqp.Channel    // channel对象
    ExchangeName string           // 交换器名称
    RouteKey     string           // 路由名称
    QueueName    string           // 队列名称
}
// 消费者回调方法
type Consumer func(amqp.Delivery)
// NewRabbitMQ 新建 rabbitmq 实例
func NewRabbitMQ(exchange, route, queue string) MessageQueue {
    var messageQueue = MessageQueue{
        ExchangeName: exchange,
        RouteKey:     route,
        QueueName:    queue,
    }
    // 建立amqp链接
    conn, err := amqp.Dial(fmt.Sprintf(
        "amqp://%s:%s@%s:%s%s",
        config.Viper.GetString("rabbitmq.username"),
        config.Viper.GetString("rabbitmq.password"),
        config.Viper.GetString("rabbitmq.host"),
        config.Viper.GetString("rabbitmq.port"),
        "/"+strings.TrimPrefix(config.Viper.GetString("rabbitmq.vhost"), "/"),
    ))
    failOnError(err, "Failed to connect to RabbitMQ")
    messageQueue.conn = conn
    // 建立channel通道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    messageQueue.ch = ch
    // 声明exchange交换器
    messageQueue.declareExchange(exchange, nil)
    return messageQueue
}
// SendMessage 发送普通消息
func (mq *MessageQueue) SendMessage(message Message) {
    err := mq.ch.Publish(
        mq.ExchangeName, // exchange
        mq.RouteKey,     // route key
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message.Body),
        },
    )
    failOnError(err, "send common msg err")
}
// SendDelayMessage 发送延迟消息
func (mq *MessageQueue) SendDelayMessage(message Message) {
    delayQueueName := mq.QueueName + "_delay:" + strconv.Itoa(message.DelayTime)
    delayRouteKey := mq.RouteKey + "_delay:" + strconv.Itoa(message.DelayTime)
    // 定义延迟队列(死信队列)
    dq := mq.declareQueue(
        delayQueueName,
        amqp.Table{
            "x-dead-letter-exchange":    mq.ExchangeName, // 指定死信交换机
            "x-dead-letter-routing-key": mq.RouteKey,     // 指定死信routing-key
        },
    )
    // 延迟队列绑定到exchange
    mq.bindQueue(dq.Name, delayRouteKey, mq.ExchangeName)
    // 发送消息,将消息发送到延迟队列,到期后自动路由到正常队列中
    err := mq.ch.Publish(
        mq.ExchangeName,
        delayRouteKey,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message.Body),
            Expiration:  strconv.Itoa(message.DelayTime * 1000),
        },
    )
    failOnError(err, "send delay msg err")
}
// Consume 获取消费消息
func (mq *MessageQueue) Consume(fn Consumer) {
    // 声明队列
    q := mq.declareQueue(mq.QueueName, nil)
    // 队列绑定到exchange
    mq.bindQueue(q.Name, mq.RouteKey, mq.ExchangeName)
    // 设置Qos
    err := mq.ch.Qos(1, 0, false)
    failOnError(err, "Failed to set QoS")
    // 监听消息
    msgs, err := mq.ch.Consume(
        q.Name, // queue name,
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")
    // forever := make(chan bool), 注册在主进程,不需要阻塞
    go func() {
        for d := range msgs {
            fn(d)
            d.Ack(false)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    // <-forever
}
// Close 关闭链接
func (mq *MessageQueue) Close() {
    mq.ch.Close()
    mq.conn.Close()
}
// declareQueue 定义队列
func (mq *MessageQueue) declareQueue(name string, args amqp.Table) amqp.Queue {
    q, err := mq.ch.QueueDeclare(
        name,
        true,
        false,
        false,
        false,
        args,
    )
    failOnError(err, "Failed to declare a delay_queue")
    return q
}
// declareQueue 定义交换器
func (mq *MessageQueue) declareExchange(exchange string, args amqp.Table) {
    err := mq.ch.ExchangeDeclare(
        exchange,
        "direct",
        true,
        false,
        false,
        false,
        args,
    )
    failOnError(err, "Failed to declare an exchange")
}
// bindQueue 绑定队列
func (mq *MessageQueue) bindQueue(queue, routekey, exchange string) {
    err := mq.ch.QueueBind(
        queue,
        routekey,
        exchange,
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")
}
// failOnError 错误处理
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s : %s", msg, err)
    }
}


调用使用

消费消息:

func registerRabbitMQConsumer() {
    // 新建连接
    rabbit := rabbitmq.NewRabbitMQ("yoyo_exchange", "yoyo_route", "yoyo_queue")
    // 一般来说消费者不关闭,常驻进程进行消息消费处理
    // defer rabbit.Close() 
    // 执行消费
    rabbit.Consume(func(d amqp.Delivery) {
        //logger.Info("rabbitmq", zap.String("rabbitmq", string(d.Body)))
    })
}

发送消息:

rabbit := rabbitmq.NewRabbitMQ("yoyo_exchange", "yoyo_route", "yoyo_queue")
defer rabbit.Close()
rabbit.SendMessage(rabbitmq.Message{Body: "这是一条普通消息"})
rabbit.SendDelayMessage(rabbitmq.Message{Body: "这是一条延时5秒的消息", DelayTime: 5})

  • 上一条:
    在Laravel项目中Exception(异常处理)优化最佳实践方式浅析
    下一条:
    在laravel中助手函数:data_forget,可以从数组或对象中删除键值
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2017-09
    • 2020-03
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-12
    • 2021-01
    • 2021-05
    • 2021-06
    • 2021-07
    • 2021-08
    • 2021-10
    • 2021-11
    • 2021-12
    • 2022-01
    • 2022-02
    • 2022-03
    • 2022-04
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-06
    • 2023-07
    • 2023-08
    • 2023-09
    • 2023-10
    • 2023-11
    • 2023-12
    • 2024-01
    • 2024-02
    • 2024-03
    • 2024-04
    • 2024-05
    • 2024-06
    • 2024-07
    • 2024-08
    • 2024-11
    • 2025-02
    • 2025-04
    • 2025-05
    • 2025-06
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客