侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

用python简单实现mysql数据同步到ElasticSearch的教程

Python  /  管理员 发布于 5年前   541

之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个

思路:

网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据

使用:

只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式

MsToEs

|----esconfig.py(配置文件)

|----mstes.py(同步程序)

|----sql_manage.py(数据库管理)

|----aa.sql(需要用到sql文件)

|----bb.sql(需要用到sql文件)

sql_manage.py:

# -*-coding:utf-8 -*-__author__ = "ZJL"from sqlalchemy.pool import QueuePoolfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmaker, scoped_sessionimport tracebackimport esconfig# 用于不需要回滚和提交的操作def find(func): def wrapper(self, *args, **kwargs):  try:   return func(self, *args, **kwargs)  except Exception as e:   print(traceback.format_exc())   print(str(e))   return traceback.format_exc()  finally:   self.session.close() return wrapperclass MysqlManager(object): def __init__(self):  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,         pool_recycle=3600)  # self.DB_Session = sessionmaker(bind=self.engine)  # self.session = self.DB_Session()  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)  self.db = scoped_session(self.DB_Session)  self.session = self.db() @find def select_all_dict(self, sql, keys):  a = self.session.execute(sql)  a = a.fetchall()  lists = []  for i in a:   if len(keys) == len(i):    data_dict = {}    for k, v in zip(keys, i):     data_dict[k] = v    lists.append(data_dict)   else:    return False  return lists # 关闭 def close(self):  self.session.close()

aa.sql:

select  CONVERT(c.`id`,CHAR)    as id,  c.`code`   as code,  c.`project_name` as project_name,  c.`name`   as name,  date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, from `cc` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

bb.sql:

select  CONVERT(c.`id`,CHAR)    as id,  CONVERT(c.`age`,CHAR)    as age,  c.`code`   as code,  c.`name`   as name,  c.`project_name` as project_name,  date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, from `bb` c where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

esconfig.py:

# -*- coding: utf-8 -*-#__author__="ZJL"# sql 文件名与es中的type名一致mysql = { # mysql连接信息 "mysql_connection_string": "root:[email protected]:3306/xxx", # sql文件信息 "statement_filespath":[  # sql对应的es索引和es类型  {   "index":"a1",   "sqlfile":"aa.sql",   "type":"aa"  },  {   "index":"a1",   "sqlfile":"bb.sql",   "type":"bb"  }, ],}# es的ip和端口elasticsearch = { "hosts":"127.0.0.1:9200",}# 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识db_field = {  "aa":   ("id",   "code",   "name",   "project_name",   "update_time",   ), "bb":  ("id",   "code",   "age",   "project_name",   "name",   "update_time",   ),}es_config = { # 间隔多少秒同步一次 "sleep_time":10, # 为了解决服务器之间时间差问题 "time_difference":3, # show_json 用来展示导入的json格式数据, "show_json":False,}

mstes.py:

# -*- coding: utf-8 -*-#__author__="ZJL"from sql_manage import MysqlManagerfrom esconfig import mysql,elasticsearch,db_field,es_configfrom elasticsearch import Elasticsearchfrom elasticsearch import helpersimport tracebackimport timeclass TongBu(object): def __init__(self):  try:   # 是否展示json数据在控制台   self.show_json = es_config.get("show_json")   # 间隔多少秒同步一次   self.sleep_time = es_config.get("sleep_time")   # 为了解决同步时数据更新产生的误差   self.time_difference = es_config.get("time_difference")   # 当前时间,留有后用   self.datetime_now = ""   # es的ip和端口   es_host = elasticsearch.get("hosts")   # 连接es   self.es = Elasticsearch(es_host)   # 连接mysql   self.mm = MysqlManager()  except :   print(traceback.format_exc()) def tongbu_es_mm(self):  try:   # 同步开始时间   start_time = time.time()   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))   # 这个list用于批量插入es   actions = []   # 获得所有sql文件list   statement_filespath = mysql.get("statement_filespath",[])   if self.datetime_now:    # 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化    # sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))   else:    self.datetime_now = "1999-01-01T00:00:00"   if statement_filespath:    for filepath in statement_filespath:     # sql文件     sqlfile = filepath.get("sqlfile")     # es的索引     es_index = filepath.get("index")     # es的type     es_type = filepath.get("type")     # 读取sql文件内容     with open(sqlfile,"r") as opf:      sqldatas = opf.read()      # ::datetime_now是一个自定义的特殊字符串用于增量更新      if "::datetime_now" in sqldatas:       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)      else:       sqldatas = sqldatas      # es和sql字段的映射      dict_set = db_field.get(es_type)      # 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)      if db_data_list:       # 将数据拼装成es的格式       for db_data in db_data_list:        action = {         "_index": es_index,         "_type": es_type,         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),         "_source": db_data        }        # 如果没有id字段就自动生成        es_id = db_data.get("id", "")        if es_id:         action["_id"] = es_id        # 是否显示json再终端        if self.show_json:         print(action)        # 将拼装好的数据放进list中        actions.append(action)   # list不为空就批量插入数据到es中   if len(actions) > 0 :    helpers.bulk(self.es, actions)  except Exception as e:   print(traceback.format_exc())  else:   end_time = time.time()   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))   self.time_difference = end_time-start_time  finally:   # 报错就关闭数据库   self.mm.close()def main(): tb = TongBu() # 间隔多少秒同步一次 sleep_time = tb.sleep_time # 死循环执行导入数据,加上时间间隔 while True:  tb.tongbu_es_mm()  time.sleep(sleep_time)if __name__ == '__main__': main()

以上这篇用python简单实现mysql数据同步到ElasticSearch的教程就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。


  • 上一条:
    关于Laravel的初始化安装
    下一条:
    Laravel 之添加图片水印
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • PHP 8.4 Alpha 1现已发布!(0个评论)
    • Laravel 11.15版本发布 - Eloquent Builder中添加的泛型(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客