侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

DataFrame:通过SparkSql将scala类转为DataFrame的方法

数据库  /  管理员 发布于 5年前   331

如下所示:

import java.text.DecimalFormatimport com.alibaba.fastjson.JSONimport com.donews.data.AppConfigimport com.typesafe.config.ConfigFactoryimport org.apache.spark.sql.types.{StructField, StructType}import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}import org.apache.spark.{SparkConf, SparkContext}import org.slf4j.LoggerFactory /** * Created by silentwolf on 2016/6/3. */ case class UserTag(SUUID: String,     MAN: Float,     WOMAN: Float,     AGE10_19: Float,     AGE20_29: Float,     AGE30_39: Float,     AGE40_49: Float,     AGE50_59: Float,     GAME: Float,     MOVIE: Float,     MUSIC: Float,     ART: Float,     POLITICS_NEWS: Float,     FINANCIAL: Float,     EDUCATION_TRAINING: Float,     HEALTH_CARE: Float,     TRAVEL: Float,     AUTOMOBILE: Float,     HOUSE_PROPERTY: Float,     CLOTHING_ACCESSORIES: Float,     BEAUTY: Float,     IT: Float,     BABY_PRODUCT: Float,     FOOD_SERVICE: Float,     HOME_FURNISHING: Float,     SPORTS: Float,     OUTDOOR_ACTIVITIES: Float,     MEDICINE: Float     ) object UserTagTable {  val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)  val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"  def main(args: Array[String]) {  var startTime = System.currentTimeMillis()  val conf: com.typesafe.config.Config = ConfigFactory.load()  val sc = new SparkContext()  val sqlContext = new SQLContext(sc)  var df1: DataFrame = null  if (args.length == 0) {  println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11") } else {   var appkey = args(0)   var lastdate = args(1)   df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)   df1.registerTempTable("suuidTable")   sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))  sqlContext.udf.register("intToString", (b: Long) => intToString(b))  import sqlContext.implicits._   //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>  val taginfoObj = JSON.parseObject(taginfo)  UserTag(suuid.toString,   taginfoObj.getFloat("man"),   taginfoObj.getFloat("woman"),   taginfoObj.getFloat("age10_19"),   taginfoObj.getFloat("age20_29"),   taginfoObj.getFloat("age30_39"),   taginfoObj.getFloat("age40_49"),   taginfoObj.getFloat("age50_59"),   taginfoObj.getFloat("game"),   taginfoObj.getFloat("movie"),   taginfoObj.getFloat("music"),   taginfoObj.getFloat("art"),   taginfoObj.getFloat("politics_news"),   taginfoObj.getFloat("financial"),   taginfoObj.getFloat("education_training"),   taginfoObj.getFloat("health_care"),   taginfoObj.getFloat("travel"),   taginfoObj.getFloat("automobile"),   taginfoObj.getFloat("house_property"),   taginfoObj.getFloat("clothing_accessories"),   taginfoObj.getFloat("beauty"),   taginfoObj.getFloat("IT"),   taginfoObj.getFloat("baby_Product"),   taginfoObj.getFloat("food_service"),   taginfoObj.getFloat("home_furnishing"),   taginfoObj.getFloat("sports"),   taginfoObj.getFloat("outdoor_activities"),   taginfoObj.getFloat("medicine")  )}.toDF().registerTempTable("resultTable")   val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +  "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +  "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +  "MEDICINE from resultTable WHERE SUUID IS NOT NULL")  resultDF.write.mode(SaveMode.Overwrite).options(  Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))  ).format("org.apache.phoenix.spark").save()  } }  def intToString(suuid: Long): String = { suuid.toString() }  def userTagInfo(num1: String): String = {  var de = new DecimalFormat("0.00") var mannum = de.format(math.random).toFloat var man = mannum var woman = de.format(1 - mannum).toFloat  var age10_19num = de.format(math.random * 0.2).toFloat var age20_29num = de.format(math.random * 0.2).toFloat var age30_39num = de.format(math.random * 0.2).toFloat var age40_49num = de.format(math.random * 0.2).toFloat  var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat  var game = de.format(math.random * 1).toFloat var movie = de.format(math.random * 1).toFloat var music = de.format(math.random * 1).toFloat var art = de.format(math.random * 1).toFloat var politics_news = de.format(math.random * 1).toFloat  var financial = de.format(math.random * 1).toFloat var education_training = de.format(math.random * 1).toFloat var health_care = de.format(math.random * 1).toFloat var travel = de.format(math.random * 1).toFloat var automobile = de.format(math.random * 1).toFloat  var house_property = de.format(math.random * 1).toFloat var clothing_accessories = de.format(math.random * 1).toFloat var beauty = de.format(math.random * 1).toFloat var IT = de.format(math.random * 1).toFloat var baby_Product = de.format(math.random * 1).toFloat  var food_service = de.format(math.random * 1).toFloat var home_furnishing = de.format(math.random * 1).toFloat var sports = de.format(math.random * 1).toFloat var outdoor_activities = de.format(math.random * 1).toFloat var medicine = de.format(math.random * 1).toFloat  "{" + "\"man\"" + ":" + man + "," + "\"woman\"" + ":" + woman + "," + "\"age10_19\"" + ":" + age10_19 + "," + "\"age20_29\"" + ":" + age20_29 + "," +  "\"age30_39\"" + ":" + age30_39 + "," + "\"age40_49\"" + ":" + age40_49 + "," + "\"age50_59\"" + ":" + age50_59 + "," + "\"game\"" + ":" + game + "," +  "\"movie\"" + ":" + movie + "," + "\"music\"" + ":" + music + "," + "\"art\"" + ":" + art + "," + "\"politics_news\"" + ":" + politics_news + "," +  "\"financial\"" + ":" + financial + "," + "\"education_training\"" + ":" + education_training + "," + "\"health_care\"" + ":" + health_care + "," +  "\"travel\"" + ":" + travel + "," + "\"automobile\"" + ":" + automobile + "," + "\"house_property\"" + ":" + house_property + "," + "\"clothing_accessories\"" + ":" + clothing_accessories + "," +  "\"beauty\"" + ":" + beauty + "," + "\"IT\"" + ":" + IT + "," + "\"baby_Product\"" + ":" + baby_Product + "," + "\"food_service\"" + ":" + food_service + "," +  "\"home_furnishing\"" + ":" + home_furnishing + "," + "\"sports\"" + ":" + sports + "," + "\"outdoor_activities\"" + ":" + outdoor_activities + "," + "\"medicine\"" + ":" + medicine +  "}";  }  def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = { val path = s"$REP_HOME/appstatistic" ctx.read.parquet(path)  .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'") }  }

以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。


  • 上一条:
    pyspark操作MongoDB的方法步骤
    下一条:
    djang常用查询SQL语句的使用代码
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 分库分表的目的、优缺点及具体实现方式介绍(0个评论)
    • DevDB - 在 VS 代码中直接访问数据库(0个评论)
    • 在ubuntu系统中实现mysql数据存储目录迁移流程步骤(0个评论)
    • 在mysql中使用存储过程批量新增测试数据流程步骤(0个评论)
    • php+mysql数据库批量根据条件快速更新、连表更新sql实现(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2017-06
    • 2017-08
    • 2017-09
    • 2017-10
    • 2017-11
    • 2018-01
    • 2018-05
    • 2018-10
    • 2018-11
    • 2020-02
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-08
    • 2020-09
    • 2021-02
    • 2021-04
    • 2021-07
    • 2021-08
    • 2021-11
    • 2021-12
    • 2022-02
    • 2022-03
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-07
    • 2023-08
    • 2023-10
    • 2023-11
    • 2023-12
    • 2024-01
    • 2024-03
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客