侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

每天迁移MySQL历史数据到历史库Python脚本

Python  /  管理员 发布于 7年前   170

本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下

#!/usr/bin/env python # coding:utf-8 __author__ = 'John'  import MySQLdb import sys import datetime import time  class ClassMigrate(object):   def _get_argv(self):     self.usage = """       usage():       python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\         --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\         --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180       """     if len(sys.argv) == 1:       print self.usage       sys.exit(1)     elif sys.argv[1] == '--help' or sys.argv[1] == '-h':         print self.usage         sys.exit()     elif len(sys.argv) > 2:       for i in sys.argv[1:]:         _argv = i.split('=')         if _argv[0] == '--source':           _list = _argv[1].split('/')           self.source_host = _list[0].split(':')[0]           self.source_port = int(_list[0].split(':')[1])           self.source_db = _list[1].split(':')[0]           self.source_tab = _list[1].split(':')[1]           self.source_user = _list[2]           self.source_password = _list[3]         elif _argv[0] == '--dest':           _list = _argv[1].split('/')           self.dest_host = _list[0].split(':')[0]           self.dest_port = int(_list[0].split(':')[1])           self.dest_db = _list[1].split(':')[0]           self.dest_tab = _list[1].split(':')[1]           self.dest_user = _list[2]           self.dest_password = _list[3]         elif _argv[0] == '--delete_strategy':           self.deleteStrategy = _argv[1]           if self.deleteStrategy not in ('delete', 'drop'): print (self.usage) sys.exit(1)         elif _argv[0] == '--primary_key':           self.pk = _argv[1]         elif _argv[0] == '--date_col':           self.date_col = _argv[1]         elif _argv[0] == '--time_interval':           self.interval = _argv[1]         else:           print (self.usage)           sys.exit(1)    def __init__(self):     self._get_argv() ## --------------------------------------------------------------------     self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')     self.sourcedb_conn_str.autocommit(True)     self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')     self.destdb_conn_str.autocommit(True) ## --------------------------------------------------------------------     self.template_tab = self.source_tab + '_template'     self.step_size = 20000 ## --------------------------------------------------------------------     self._migCompleteState = False     self._deleteCompleteState = False ## --------------------------------------------------------------------     self.source_cnt = ''     self.source_min_id = ''     self.source_max_id = ''     self.source_checksum = ''     self.dest_cn = '' ## --------------------------------------------------------------------     self.today = time.strftime("%Y-%m-%d")     # self.today = '2016-05-30 09:59:40' def sourcedb_query(self, sql, sql_type):     try:       cr = self.sourcedb_conn_str.cursor()       cr.execute(sql)       if sql_type == 'select':         return cr.fetchall()       elif sql_type == 'dml':         rows = self.sourcedb_conn_str.affected_rows()         return rows       else:         return True     except Exception, e:       print (str(e) + "<br>")       return False     finally:       cr.close()   def destdb_query(self, sql, sql_type, values=''):     try:       cr = self.destdb_conn_str.cursor()       if sql_type == 'select':         cr.execute(sql)         return cr.fetchall()       elif sql_type == 'insertmany':         cr.executemany(sql, values)         rows = self.destdb_conn_str.affected_rows()         return rows       else:         cr.execute(sql)         return True     except Exception, e:       print (str(e) + "<br>")       return False     finally:       cr.close()   def create_table_from_source(self):     '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''     try:       sql = "show create table %s;" % self.source_tab       create_str = self.sourcedb_query(sql, 'select')[0][1]       create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')       self.destdb_query(create_str, 'ddl')       return True     except Exception, e:       print (str(e) + "<br>")       return False   def create_table_from_template(self):     try:       sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)       state = self.destdb_query(sql, 'ddl')       if state:         return True       else:         return False     except Exception, e:       print (str(e + "<br>") + "<br>")       return False   def get_min_max(self):     """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """     try:       print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())       sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \ % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)       q = self.sourcedb_query(sql, 'select')       self.source_cnt = q[0][0]       self.source_min_id = q[0][1]       self.source_max_id = q[0][2]       self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)       if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:         print ("There is 0 record in source table been matched! <br>")         return False       else:         return True     except Exception, e:       print (str(e) + "<br>")       return False    def migrate_2_destdb(self):     try:       get_min_max_id = self.get_min_max()       if get_min_max_id:         k = self.source_min_id         desc_sql = "desc %s;" % self.source_tab         # self.filed = []         cols = self.sourcedb_query(desc_sql, 'select')         # for j in cols:         #   self.filed.append(j[0])         fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句         fileds = fileds.rstrip(',')         while k <= self.source_max_id:           sql = """select * from %s where %s >= %d and %s< %d \    and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \    and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\  % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)           print ("\n%s <br>") % sql           starttime = datetime.datetime.now()           results = self.sourcedb_query(sql, 'select')           insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds           rows = self.destdb_query(insert_sql, 'insertmany', results)           if rows == False: print ("Insert failed!! <br>")           else: print ("Inserted %s rows. <br>") % rows           endtime = datetime.datetime.now()           timeinterval = endtime - starttime           print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")           k += self.step_size         print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())         return True       else:         return False     except Exception, e:       print (str(e) + "<br>")       return False     def verify_total_cnt(self):     try:       sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \ % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)       dest_result = self.destdb_query(sql, 'select')       self.dest_cnt = dest_result[0][0]       dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])       print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)       if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:         self._migCompleteState = True         print ("Verify successfully !!<br>")       else:         print ("Verify failed !!<br>")         sys.exit(77)     except Exception, e:       print (str(e) + "<br>")     def drop_daily_partition(self):     try:       if self._migCompleteState:         sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')    and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\  % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)         partition_name = self.sourcedb_query(sql, 'select')         partition_name = partition_name[0][3] sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \ % (self.pk, self.pk, self.source_tab, partition_name)         q = self.sourcedb_query(sql, 'select')         source_cnt = q[0][0]         source_min_id = q[0][1]         source_max_id = q[0][2]         checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)         if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:           print ("There is 0 record in source PARTITION been matched! <br>")         else:           if checksum == self.source_checksum: drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name) droped = self.sourcedb_query(drop_par_sql, 'ddl') if droped:   print (drop_par_sql + " <br>")   print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())   self._deleteCompleteState = True else:   print (drop_par_sql + " <br>")   print ("Drop partition failed.. <br>")           else: print ("The partition %s checksum failed !! Drop failed !!") % partition_name sys.exit(77)     except Exception, e:       print (str(e) + "<br>")    def delete_data(self):     try:       if self._migCompleteState:         k = self.source_min_id         while k <= self.source_max_id:           sql = """delete from %s where %s >= %d and %s< %d \    and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \    and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \  % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)           print ("\n%s <br>") % sql           starttime = datetime.datetime.now()           rows = self.sourcedb_query(sql, 'dml')           if rows == False: print ("Delete failed!! <br>")           else: print ("Deleted %s rows. <br>") % rows           endtime = datetime.datetime.now()           timeinterval = endtime - starttime           print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")           time.sleep(1)           k += self.step_size         print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())         self._deleteCompleteState = True     except Exception, e:       print (str(e) + "<br>")          def do(self):     tab_create = self.create_table_from_template()     if tab_create:       migration = self.migrate_2_destdb()       if migration:         self.verify_total_cnt()         if self._migCompleteState:           if self.deleteStrategy == 'drop': self.drop_daily_partition()           else: self.delete_data()           print ("\n<br>")           print ("====="*5 + '<br>')           print ("source_total_cnt: %s <br>") % self.source_cnt           print ("dest_total_cnt: %s <br>") % self.dest_cnt           print ("====="*5 + '<br>')           if self._deleteCompleteState: print ("\nFinal result: Successfully !! <br>") sys.exit(88)           else: print ("\nFinal result: Failed !! <br>") sys.exit(254)     else:       print ("Create table failed ! Exiting. . .")       sys.exit(255)   f = ClassMigrate() f.do() 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    Python 实现使用dict 创建二维数据、DataFrame
    下一条:
    python实现数据库跨服务器迁移
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(95个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客