在go语言中封装rabbitmq服务,实现正常、延时消息发送功能
Go  /  管理员 发布于 1年前   386
在Go语言中封装RabbitMQ可以提供更简洁、易用、可维护的API,提高开发效率,实现解耦和异步处理。
封装代码示例:
package rabbitmq
import (
"fmt"
"log"
"strconv"
"strings"
"yoyo/pkg/config"
"github.com/streadway/amqp"
)
// 消息体:DelayTime 仅在 SendDelayMessage 方法有效
type Message struct {
DelayTime int // desc:延迟时间(秒)
Body string
}
type MessageQueue struct {
conn *amqp.Connection // amqp链接对象
ch *amqp.Channel // channel对象
ExchangeName string // 交换器名称
RouteKey string // 路由名称
QueueName string // 队列名称
}
// 消费者回调方法
type Consumer func(amqp.Delivery)
// NewRabbitMQ 新建 rabbitmq 实例
func NewRabbitMQ(exchange, route, queue string) MessageQueue {
var messageQueue = MessageQueue{
ExchangeName: exchange,
RouteKey: route,
QueueName: queue,
}
// 建立amqp链接
conn, err := amqp.Dial(fmt.Sprintf(
"amqp://%s:%s@%s:%s%s",
config.Viper.GetString("rabbitmq.username"),
config.Viper.GetString("rabbitmq.password"),
config.Viper.GetString("rabbitmq.host"),
config.Viper.GetString("rabbitmq.port"),
"/"+strings.TrimPrefix(config.Viper.GetString("rabbitmq.vhost"), "/"),
))
failOnError(err, "Failed to connect to RabbitMQ")
messageQueue.conn = conn
// 建立channel通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
messageQueue.ch = ch
// 声明exchange交换器
messageQueue.declareExchange(exchange, nil)
return messageQueue
}
// SendMessage 发送普通消息
func (mq *MessageQueue) SendMessage(message Message) {
err := mq.ch.Publish(
mq.ExchangeName, // exchange
mq.RouteKey, // route key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message.Body),
},
)
failOnError(err, "send common msg err")
}
// SendDelayMessage 发送延迟消息
func (mq *MessageQueue) SendDelayMessage(message Message) {
delayQueueName := mq.QueueName + "_delay:" + strconv.Itoa(message.DelayTime)
delayRouteKey := mq.RouteKey + "_delay:" + strconv.Itoa(message.DelayTime)
// 定义延迟队列(死信队列)
dq := mq.declareQueue(
delayQueueName,
amqp.Table{
"x-dead-letter-exchange": mq.ExchangeName, // 指定死信交换机
"x-dead-letter-routing-key": mq.RouteKey, // 指定死信routing-key
},
)
// 延迟队列绑定到exchange
mq.bindQueue(dq.Name, delayRouteKey, mq.ExchangeName)
// 发送消息,将消息发送到延迟队列,到期后自动路由到正常队列中
err := mq.ch.Publish(
mq.ExchangeName,
delayRouteKey,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message.Body),
Expiration: strconv.Itoa(message.DelayTime * 1000),
},
)
failOnError(err, "send delay msg err")
}
// Consume 获取消费消息
func (mq *MessageQueue) Consume(fn Consumer) {
// 声明队列
q := mq.declareQueue(mq.QueueName, nil)
// 队列绑定到exchange
mq.bindQueue(q.Name, mq.RouteKey, mq.ExchangeName)
// 设置Qos
err := mq.ch.Qos(1, 0, false)
failOnError(err, "Failed to set QoS")
// 监听消息
msgs, err := mq.ch.Consume(
q.Name, // queue name,
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// forever := make(chan bool), 注册在主进程,不需要阻塞
go func() {
for d := range msgs {
fn(d)
d.Ack(false)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
// <-forever
}
// Close 关闭链接
func (mq *MessageQueue) Close() {
mq.ch.Close()
mq.conn.Close()
}
// declareQueue 定义队列
func (mq *MessageQueue) declareQueue(name string, args amqp.Table) amqp.Queue {
q, err := mq.ch.QueueDeclare(
name,
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare a delay_queue")
return q
}
// declareQueue 定义交换器
func (mq *MessageQueue) declareExchange(exchange string, args amqp.Table) {
err := mq.ch.ExchangeDeclare(
exchange,
"direct",
true,
false,
false,
false,
args,
)
failOnError(err, "Failed to declare an exchange")
}
// bindQueue 绑定队列
func (mq *MessageQueue) bindQueue(queue, routekey, exchange string) {
err := mq.ch.QueueBind(
queue,
routekey,
exchange,
false,
nil,
)
failOnError(err, "Failed to bind a queue")
}
// failOnError 错误处理
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s : %s", msg, err)
}
}
调用使用
消费消息:
func registerRabbitMQConsumer() {
// 新建连接
rabbit := rabbitmq.NewRabbitMQ("yoyo_exchange", "yoyo_route", "yoyo_queue")
// 一般来说消费者不关闭,常驻进程进行消息消费处理
// defer rabbit.Close()
// 执行消费
rabbit.Consume(func(d amqp.Delivery) {
//logger.Info("rabbitmq", zap.String("rabbitmq", string(d.Body)))
})
}
发送消息:
rabbit := rabbitmq.NewRabbitMQ("yoyo_exchange", "yoyo_route", "yoyo_queue")
defer rabbit.Close()
rabbit.SendMessage(rabbitmq.Message{Body: "这是一条普通消息"})
rabbit.SendDelayMessage(rabbitmq.Message{Body: "这是一条延时5秒的消息", DelayTime: 5})
123 在
Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..原梓番博客 在
在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..博主 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..1111 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..路人 在
php中使用hyperf框架调用讯飞星火大模型实现国内版chatgpt功能示例中评论 教程很详细,如果加个前端chatgpt对话页面就完美了..Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号