侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

MLSQL Stack如何让流调试更加简单详解

数据库  /  管理员 发布于 5年前   190

前言

有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:

  • 能随时查看最新固定条数的Kafka数据
  • 调试结果(sink)能打印在web控制台
  • 流程序能自动推测json schema(现在spark是不行的)

实现这三个点之后,我发现调试确实就变得简单很多了。

流程

首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:

set abc='''{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}''';load jsonStr.`abc` as table1;select to_json(struct(*)) as value from table1 as table2;save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092";

这样我每次运行,数据就能写入到Kafka.

接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:

没有什么问题。接着我写一个非常简单的流式程序:

-- the stream name, should be uniq.set streamName="streamExample";-- use kafkaTool to infer schema from kafka!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092"as newkafkatable1;select * from newkafkatable1as table21;-- print in webConsole instead of terminal console.save append table21 as webConsole.`` options mode="Append"and duration="15"and checkpointLocation="/tmp/s-cpl4";

运行结果如下:

在终端我们也可以看到实时效果了。

补充

当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:

-- the stream name, should be uniq.set streamName="streamExample";-- mock some data.set data='''{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}''';-- load data as tableload jsonStr.`data` as datasource;-- convert table as stream sourceload mockStream.`datasource` options stepSizeRange="0-3"as newkafkatable1;-- aggregation select cast(value as string) as k from newkafkatable1as table21;!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";-- output the the result to console.save append table21 as custom.`` options mode="append"and duration="15"and sourceTable="jack"and code='''select count(*) as c from jack as newjack;save append newjack as parquet.`/tmp/jack`; '''and checkpointLocation="/tmp/cpl15";

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。


  • 上一条:
    SQL Server 完整备份遇到的一个不常见的错误及解决方法
    下一条:
    Navicat for SQLite导入csv中文数据的方法
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 分库分表的目的、优缺点及具体实现方式介绍(0个评论)
    • DevDB - 在 VS 代码中直接访问数据库(0个评论)
    • 在ubuntu系统中实现mysql数据存储目录迁移流程步骤(0个评论)
    • 在mysql中使用存储过程批量新增测试数据流程步骤(0个评论)
    • php+mysql数据库批量根据条件快速更新、连表更新sql实现(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2017-06
    • 2017-08
    • 2017-09
    • 2017-10
    • 2017-11
    • 2018-01
    • 2018-05
    • 2018-10
    • 2018-11
    • 2020-02
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-08
    • 2020-09
    • 2021-02
    • 2021-04
    • 2021-07
    • 2021-08
    • 2021-11
    • 2021-12
    • 2022-02
    • 2022-03
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-07
    • 2023-08
    • 2023-10
    • 2023-11
    • 2023-12
    • 2024-01
    • 2024-03
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客