侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

go语言实现日志收集系统图文详解

Go  /  管理员 发布于 5年前   362

整理了一下这个日志收集系统的框,如下图

1.jpg这次要实现的代码的整体逻辑为:

2.jpg完整代码地址为: https://github.com/pythonsite/logagent

etcd介绍

高可用的分布式key-value存储,可以用于配置共享和服务发现

类似的项目:zookeeper和consul

开发语言:go

接口:提供restful的接口,使用简单

实现算法:基于raft算法的强一致性,高可用的服务存储目录

etcd的应用场景:

1、服务发现和服务注册

2、配置中心(我们实现的日志收集客户端需要用到)

3、分布式锁

4、master选举

官网对etcd的有一个非常简明的介绍:

3.jpg

etcd搭建:
下载地址:https://github.com/coreos/etcd/releases/
根据自己的环境下载对应的版本然后启动起来就可以了

启动之后可以通过如下命令验证一下:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get namezhaofan[root@localhost etcd-v3.2.18-linux-amd64]#

context 介绍和使用

其实这个东西翻译过来就是上下文管理,那么context的作用是做什么,主要有如下两个作用:

1、控制goroutine的超时

2、保存上下文数据

通过下面一个简单的例子进行理解:

package mainimport (    "fmt"    "time"    "net/http"    "context"    "io/ioutil")type Result struct{    r *http.Response    err error}func process(){    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)    defer cancel()    tr := &http.Transport{}    client := &http.Client{Transport:tr}    c := make(chan Result,1)    req,err := http.NewRequest("GET","http://www.google.com",nil)    if err != nil{        fmt.Println("http request failed,err:",err)        return    }    // 如果请求成功了会将数据存入到管道中    go func(){        resp,err := client.Do(req)        pack := Result{resp,err}        c <- pack    }()    select{    case <- ctx.Done():        tr.CancelRequest(req)        fmt.Println("timeout!")    case res := <-c:        defer res.r.Body.Close()        out,_:= ioutil.ReadAll(res.r.Body)        fmt.Printf("server response:%s",out)    }    return}func main() {    process()}

写一个通过context保存上下文,代码例子如:

package mainimport (    "github.com/Go-zh/net/context"    "fmt")func add(ctx context.Context,a,b int) int {    traceId := ctx.Value("trace_id").(string)    fmt.Printf("trace_id:%v\n",traceId)    return a+b}func calc(ctx context.Context,a, b int) int{    traceId := ctx.Value("trace_id").(string)    fmt.Printf("trace_id:%v\n",traceId)    //再将ctx传入到add中    return add(ctx,a,b)}func main() {    //将ctx传递到calc中    ctx := context.WithValue(context.Background(),"trace_id","123456")    calc(ctx,20,30)}

结合etcd和context使用

关于通过go连接etcd的简单例子:(这里有个小问题需要注意就是etcd的启动方式,默认启动可能会连接不上,尤其你是在虚拟你安装,所以需要通过如下命令启动:
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
)

package mainimport (    etcd_client "github.com/coreos/etcd/clientv3"    "time"    "fmt")func main() {    cli, err := etcd_client.New(etcd_client.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil{        fmt.Println("connect failed,err:",err)        return    }    fmt.Println("connect success")    defer cli.Close()}

下面一个例子是通过连接etcd,存值并取值

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "fmt"    "context")func main() {    cli,err := clientv3.New(clientv3.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil{        fmt.Println("connect failed,err:",err)        return    }    fmt.Println("connect succ")    defer cli.Close()    ctx,cancel := context.WithTimeout(context.Background(),time.Second)    _,err = cli.Put(ctx,"logagent/conf/","sample_value")    cancel()    if err != nil{        fmt.Println("put failed,err",err)        return    }    ctx, cancel = context.WithTimeout(context.Background(),time.Second)    resp,err := cli.Get(ctx,"logagent/conf/")    cancel()    if err != nil{        fmt.Println("get failed,err:",err)        return    }    for _,ev := range resp.Kvs{        fmt.Printf("%s:%s\n",ev.Key,ev.Value)    }}

关于context官网也有一个例子非常有用,用于控制开启的goroutine的退出,代码如下:

package mainimport (    "context"    "fmt")func main() {    // gen generates integers in a separate goroutine and    // sends them to the returned channel.    // The callers of gen need to cancel the context once    // they are done consuming generated integers not to leak    // the internal goroutine started by gen.    gen := func(ctx context.Context) <-chan int {        dst := make(chan int)        n := 1        go func() {for {    select {    case <-ctx.Done():        return // returning not to leak the goroutine    case dst <- n:        n++    }}        }()        return dst    }    ctx, cancel := context.WithCancel(context.Background())    defer cancel() // cancel when we are finished consuming integers    for n := range gen(ctx) {        fmt.Println(n)        if n == 5 {break        }    }}

关于官网文档中的WithDeadline演示的代码例子:

package mainimport (    "context"    "fmt"    "time")func main() {    d := time.Now().Add(50 * time.Millisecond)    ctx, cancel := context.WithDeadline(context.Background(), d)    // Even though ctx will be expired, it is good practice to call its    // cancelation function in any case. Failure to do so may keep the    // context and its parent alive longer than necessary.    defer cancel()    select {    case <-time.After(1 * time.Second):        fmt.Println("overslept")    case <-ctx.Done():        fmt.Println(ctx.Err())    }}

通过上面的代码有了一个基本的使用,那么如果我们通过etcd来做配置管理,如果配置更改之后,我们如何通知对应的服务器配置更改,通过下面例子演示:

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "fmt"    "context")func main() {    cli,err := clientv3.New(clientv3.Config{        Endpoints:[]string{"192.168.0.118:2371"},        DialTimeout:5*time.Second,    })    if err != nil {        fmt.Println("connect failed,err:",err)        return    }    defer cli.Close()    // 这里会阻塞    rch := cli.Watch(context.Background(),"logagent/conf/")    for wresp := range rch{        for _,ev := range wresp.Events{fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)        }    }}

实现一个kafka的消费者代码的简单例子:

package mainimport (    "github.com/Shopify/sarama"    "strings"    "fmt"    "time")func main() {    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)    if err != nil{        fmt.Println("failed to start consumer:",err)        return    }    partitionList,err := consumer.Partitions("nginx_log")    if err != nil {        fmt.Println("Failed to get the list of partitions:",err)        return    }    fmt.Println(partitionList)    for partition := range partitionList{        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)        if err != nil {fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)return        }        defer pc.AsyncClose()        go func(partitionConsumer sarama.PartitionConsumer){for msg := range pc.Messages(){    fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}        }(pc)    }    time.Sleep(time.Hour)    consumer.Close()}

但是上面的代码并不是最佳代码,因为我们最后是通过time.sleep等待goroutine的执行,我们可以更改为通过sync.WaitGroup方式实现

package mainimport (    "github.com/Shopify/sarama"    "strings"    "fmt"    "sync")var (    wg sync.WaitGroup)func main() {    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)    if err != nil{        fmt.Println("failed to start consumer:",err)        return    }    partitionList,err := consumer.Partitions("nginx_log")    if err != nil {        fmt.Println("Failed to get the list of partitions:",err)        return    }    fmt.Println(partitionList)    for partition := range partitionList{        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)        if err != nil {fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)return        }        defer pc.AsyncClose()        go func(partitionConsumer sarama.PartitionConsumer){wg.Add(1)for msg := range partitionConsumer.Messages(){    fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))}wg.Done()        }(pc)    }    //time.Sleep(time.Hour)    wg.Wait()    consumer.Close()}

将客户端需要收集的日志信息放到etcd中

关于etcd处理的代码为:

package mainimport (    "github.com/coreos/etcd/clientv3"    "time"    "github.com/astaxie/beego/logs"    "context"    "fmt")var Client *clientv3.Clientvar logConfChan chan string// 初始化etcdfunc initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){    var keys []string    for _,ip := range ipArrays{        //keyfmt = /logagent/%s/log_config        keys = append(keys,fmt.Sprintf(keyfmt,ip))    }    logConfChan = make(chan string,10)    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)    Client,err = clientv3.New(clientv3.Config{        Endpoints:addr,        DialTimeout: timeout,    })    if err != nil{        logs.Error("connect failed,err:%v",err)        return    }    logs.Debug("init etcd success")    waitGroup.Add(1)    for _, key := range keys{        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)        // 从etcd中获取要收集日志的信息        resp,err := Client.Get(ctx,key)        cancel()        if err != nil {logs.Warn("get key %s failed,err:%v",key,err)continue        }        for _, ev := range resp.Kvs{logs.Debug("%q : %q\n",  ev.Key, ev.Value)logConfChan <- string(ev.Value)        }    }    go WatchEtcd(keys)    return}func WatchEtcd(keys []string){    // 这里用于检测当需要收集的日志信息更改时及时更新    var watchChans []clientv3.WatchChan    for _,key := range keys{        rch := Client.Watch(context.Background(),key)        watchChans = append(watchChans,rch)    }    for {        for _,watchC := range watchChans{select{case wresp := <-watchC:    for _,ev:= range wresp.Events{        logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)        logConfChan <- string(ev.Kv.Value)    }default:}        }        time.Sleep(time.Second)    }    waitGroup.Done()}func GetLogConf()chan string{    return logConfChan}

同样的这里增加对了限速的处理,毕竟日志收集程序不能影响了当前业务的性能,所以增加了limit.go用于限制速度:

package mainimport (    "time"    "sync/atomic"    "github.com/astaxie/beego/logs")type SecondLimit struct {    unixSecond int64    curCount int32    limit int32}func NewSecondLimit(limit int32) *SecondLimit {    secLimit := &SecondLimit{        unixSecond:time.Now().Unix(),        curCount:0,        limit:limit,    }    return secLimit}func (s *SecondLimit) Add(count int) {    sec := time.Now().Unix()    if sec == s.unixSecond {        atomic.AddInt32(&s.curCount,int32(count))        return    }    atomic.StoreInt64(&s.unixSecond,sec)    atomic.StoreInt32(&s.curCount, int32(count))}func (s *SecondLimit) Wait()bool {    for {        sec := time.Now().Unix()        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {time.Sleep(time.Microsecond)logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)continue        }        if sec != atomic.LoadInt64(&s.unixSecond) {atomic.StoreInt64(&s.unixSecond,sec)atomic.StoreInt32(&s.curCount,0)        }        logs.Debug("limit is exited")        return false    }}

推荐:go语言教程

以上就是go语言实现日志收集系统图文详解的详细内容,更多请关注其它相关文章!


  • 上一条:
    go语言的异常处理介绍
    下一条:
    go语言time包的一些使用方法
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • 近期文章
    • 智能合约Solidity学习CryptoZombie二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2017-09
    • 2020-03
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-12
    • 2021-01
    • 2021-05
    • 2021-06
    • 2021-07
    • 2021-08
    • 2021-10
    • 2021-11
    • 2021-12
    • 2022-01
    • 2022-02
    • 2022-03
    • 2022-04
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-06
    • 2023-07
    • 2023-08
    • 2023-09
    • 2023-10
    • 2023-11
    • 2023-12
    • 2024-01
    • 2024-02
    • 2024-03
    • 2024-04
    • 2024-05
    • 2024-06
    • 2024-07
    • 2024-08
    • 2024-11
    • 2025-02
    • 2025-04
    • 2025-05
    • 2025-06
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客