关于redigo中PubSub的一点小坑分析
Go  /  管理员 发布于 4年前   661
前言
最近在用 golang 做一些 redis 相关的操作,选用了 redigo 这个第三方库。然后在使用 Pub/Sub 的时候,却发现了一个小坑……
Redis Client
首先,我们来初始化一个带连接池的 Redis Client:
import ("github.com/gomodule/redigo/redis")type RedisClient struct {pool *redis.Pool}func NewRedisClient(addr string, db int, passwd string) *RedisClient {pool := &redis.Pool{MaxIdle: 10,IdleTimeout: 300 * time.Second,Dial: func() (redis.Conn, error) {c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))if err != nil {return nil, err}return c, nil},TestOnBorrow: func(c redis.Conn, t time.Time) error {if time.Since(t) < time.Minute {return nil}_, err := c.Do("PING")return err},}log.Printf("new redis pool at %s", addr)client := &RedisClient{pool: pool,}return client}
Publish
然后我们可以简单的实现一个 publish 方法:
func (r *RedisClient) Publish(channel, message string) (int, error) {c := r.pool.Get()defer c.Close()n, err := redis.Int(c.Do("PUBLISH", channel, message))if err != nil {return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)}return n, nil}
Subscribe
接下来就是一个稍微复杂点的带有心跳的 subscribe 方法:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {psc := redis.PubSubConn{Conn: r.pool.Get()}defer psc.Close()log.Printf("redis pubsub subscribe channel: %v", channel)if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {return err}done := make(chan error, 1)// start a new goroutine to receive messagego func() {for {switch msg := psc.Receive().(type) {case error:done <- fmt.Errorf("redis pubsub receive err: %v", msg)returncase redis.Message:if err := consume(msg); err != nil {done <- errreturn}case redis.Subscription:if msg.Count == 0 {// all channels are unsubscribeddone <- nilreturn}}}}()// health checktick := time.NewTicker(time.Minute)defer tick.Stop()for {select {case <-ctx.Done():if err := psc.Unsubscribe(); err != nil {return fmt.Errorf("redis pubsub unsubscribe err: %v", err)}return nilcase err := <-done:return errcase <-tick.C:if err := psc.Ping(""); err != nil {return err}}}return nil}
最后,我们写一个简单地 main 函数来调用 publish & subscribe:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {psc := redis.PubSubConn{Conn: r.pool.Get()}defer psc.Close()log.Printf("redis pubsub subscribe channel: %v", channel)if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {return err}done := make(chan error, 1)// start a new goroutine to receive messagego func() {for {switch msg := psc.Receive().(type) {case error:done <- fmt.Errorf("redis pubsub receive err: %v", msg)returncase redis.Message:if err := consume(msg); err != nil {done <- errreturn}case redis.Subscription:if msg.Count == 0 {// all channels are unsubscribeddone <- nilreturn}}}}()// health checktick := time.NewTicker(time.Minute)defer tick.Stop()for {select {case <-ctx.Done():if err := psc.Unsubscribe(); err != nil {return fmt.Errorf("redis pubsub unsubscribe err: %v", err)}return nilcase err := <-done:return errcase <-tick.C:if err := psc.Ping(""); err != nil {return err}}}return nil}
坑
咋一看之下,好像并没有什么异常?然而,如果我们这时候去看 redis 的 tcp 连接,就可以发现一些猫腻:
$sudo netstat -antp | grep redistcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55010 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55015 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55009 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55005 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55012 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55011 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55013 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55007 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55006 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:55014 ESTABLISHED 940/redis-server 0. tcp 0 0 172.16.8.128:6379 172.16.8.1:54972 ESTABLISHED 940/redis-server 0.
竟然是每一次 subscribe 就新建了一个连接,而 connection pool 似乎没有什么作用。
更进一步地调试,我们发现在 defer psc.Close() 的时候就卡住了,也就是上面的 10 个 goroutine 其实并没有正常退出。
Concurrent
排查许久之后,终于定位到了问题!引用 redigo 的说明:
Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.
For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.
也就是说,虽然一个连接可以在不同的 goroutine 并发调用 Receive() 和 Subscribe()(subscribe调用了send和flush) ,但是却不能再有其他并发操作(比如 Close())。
其他相似的问题还可以参考 issue
Fix
知道了上面的原因之后,我们稍微修改一下 defer psc.Close() 的位置即可解决问题:
// start a new goroutine to receive messagego func() {// IMPORTANT!defer psc.Close()for {switch msg := psc.Receive().(type) {case error:
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对AIDI的支持。
122 在
学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..123 在
Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..原梓番博客 在
在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..博主 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..1111 在
佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
Copyright·© 2019 侯体宗版权所有·
粤ICP备20027696号