侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Spark自定义累加器的使用实例详解

技术  /  管理员 发布于 7年前   193

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

累加器简单使用

Spark内置的提供了Long和Double类型的累加器。下面是一个简单的使用示例,在这个例子中我们在过滤掉RDD中奇数的同时进行计数,最后计算剩下整数的和。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]") val sc = new SparkContext(sparkConf) val accum = sc.longAccumulator("longAccum") //统计奇数的个数 val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{  if(n%2!=0) accum.add(1L)   n%2==0 }).reduce(_+_) println("sum: "+sum) println("accum: "+accum.value) sc.stop() 

结果为:

sum: 20
accum: 5

这是结果正常的情况,但是在使用累加器的过程中如果对于spark的执行过程理解的不够深入就会遇到两类典型的错误:少加(或者没加)、多加。

自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。

继承AccumulatorV2类,并复写它的所有方法

package sparkimport constant.Constantimport org.apache.spark.util.AccumulatorV2import util.getFieldFromConcatStringimport util.setFieldFromConcatStringopen class SessionAccmulator : AccumulatorV2<String, String>() {  private var result = Constant.SESSION_COUNT + "=0|"+      Constant.TIME_PERIOD_1s_3s + "=0|"+      Constant.TIME_PERIOD_4s_6s + "=0|"+      Constant.TIME_PERIOD_7s_9s + "=0|"+      Constant.TIME_PERIOD_10s_30s + "=0|"+      Constant.TIME_PERIOD_30s_60s + "=0|"+      Constant.TIME_PERIOD_1m_3m + "=0|"+      Constant.TIME_PERIOD_3m_10m + "=0|"+      Constant.TIME_PERIOD_10m_30m + "=0|"+      Constant.TIME_PERIOD_30m + "=0|"+      Constant.STEP_PERIOD_1_3 + "=0|"+      Constant.STEP_PERIOD_4_6 + "=0|"+      Constant.STEP_PERIOD_7_9 + "=0|"+      Constant.STEP_PERIOD_10_30 + "=0|"+      Constant.STEP_PERIOD_30_60 + "=0|"+      Constant.STEP_PERIOD_60 + "=0"  override fun value(): String {    return this.result  }  /**   * 合并数据   */  override fun merge(other: AccumulatorV2<String, String>?) {    if (other == null) return else {      if (other is SessionAccmulator) {        var newResult = ""        val resultArray = arrayOf(Constant.SESSION_COUNT,Constant.TIME_PERIOD_1s_3s, Constant.TIME_PERIOD_4s_6s, Constant.TIME_PERIOD_7s_9s,Constant.TIME_PERIOD_10s_30s, Constant.TIME_PERIOD_30s_60s, Constant.TIME_PERIOD_1m_3m,Constant.TIME_PERIOD_3m_10m, Constant.TIME_PERIOD_10m_30m, Constant.TIME_PERIOD_30m,Constant.STEP_PERIOD_1_3, Constant.STEP_PERIOD_4_6, Constant.STEP_PERIOD_7_9,Constant.STEP_PERIOD_10_30, Constant.STEP_PERIOD_30_60, Constant.STEP_PERIOD_60)        resultArray.forEach {          val oldValue = other.result.getFieldFromConcatString("|", it)          if (oldValue.isNotEmpty()) {val newValue = oldValue.toInt() + 1//找到原因,一直在循环赋予值,debug30分钟 很烦if (newResult.isEmpty()){  newResult = result.setFieldFromConcatString("|", it, newValue.toString())}//问题就在于这里,自定义没有写错,合并错了newResult = newResult.setFieldFromConcatString("|", it, newValue.toString())          }        }        result = newResult      }    }  }  override fun copy(): AccumulatorV2<String, String> {    val sessionAccmulator = SessionAccmulator()    sessionAccmulator.result = this.result    return sessionAccmulator  }  override fun add(p0: String?) {    val v1 = this.result    val v2 = p0    if (v2.isNullOrEmpty()){      return    }else{      var newResult = ""      val oldValue = v1.getFieldFromConcatString("|", v2!!)      if (oldValue.isNotEmpty()){        val newValue = oldValue.toInt() + 1        newResult = result.setFieldFromConcatString("|", v2, newValue.toString())      }      result = newResult    }  }  override fun reset() {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    result = newResult  }  override fun isZero(): Boolean {    val newResult = Constant.SESSION_COUNT + "=0|"+        Constant.TIME_PERIOD_1s_3s + "=0|"+        Constant.TIME_PERIOD_4s_6s + "=0|"+        Constant.TIME_PERIOD_7s_9s + "=0|"+        Constant.TIME_PERIOD_10s_30s + "=0|"+        Constant.TIME_PERIOD_30s_60s + "=0|"+        Constant.TIME_PERIOD_1m_3m + "=0|"+        Constant.TIME_PERIOD_3m_10m + "=0|"+        Constant.TIME_PERIOD_10m_30m + "=0|"+        Constant.TIME_PERIOD_30m + "=0|"+        Constant.STEP_PERIOD_1_3 + "=0|"+        Constant.STEP_PERIOD_4_6 + "=0|"+        Constant.STEP_PERIOD_7_9 + "=0|"+        Constant.STEP_PERIOD_10_30 + "=0|"+        Constant.STEP_PERIOD_30_60 + "=0|"+        Constant.STEP_PERIOD_60 + "=0"    return this.result == newResult  }}

方法介绍

value方法:获取累加器中的值

       merge方法:该方法特别重要,一定要写对,这个方法是各个task的累加器进行合并的方法(下面介绍执行流程中将要用到)

        iszero方法:判断是否为初始值

        reset方法:重置累加器中的值

        copy方法:拷贝累加器

spark中累加器的执行流程:

          首先有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此过程中,被最初注册的累加器的值是不变的),执行最后将调用merge方法和各个task的结果累计器进行合并(此时被注册的累加器是初始值)

总结

以上就是本文关于Spark自定义累加器的使用实例详解的全部内容,希望对大家有所帮助。有什么问题可以随时留言,小编会及时回复大家的。


  • 上一条:
    hbase 简介
    下一条:
    Spark的广播变量和累加器使用方法代码示例
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期文章
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2017-07
    • 2017-08
    • 2017-09
    • 2018-01
    • 2018-07
    • 2018-08
    • 2018-09
    • 2018-12
    • 2019-01
    • 2019-02
    • 2019-03
    • 2019-04
    • 2019-05
    • 2019-06
    • 2019-07
    • 2019-08
    • 2019-09
    • 2019-10
    • 2019-11
    • 2019-12
    • 2020-01
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-08
    • 2020-09
    • 2020-10
    • 2020-11
    • 2021-04
    • 2021-05
    • 2021-06
    • 2021-07
    • 2021-08
    • 2021-09
    • 2021-10
    • 2021-12
    • 2022-01
    • 2022-02
    • 2022-03
    • 2022-04
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-06
    • 2023-07
    • 2023-08
    • 2023-09
    • 2023-10
    • 2023-12
    • 2024-02
    • 2024-04
    • 2024-05
    • 2024-06
    • 2025-02
    • 2025-07
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客