侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python实现 多进程导入CSV数据到 MySQL

Python  /  管理员 发布于 7年前   164

前段时间帮同事处理了一个把 CSV 数据导入到 MySQL 的需求。两个很大的 CSV 文件, 分别有 3GB、2100 万条记录和 7GB、3500 万条记录。对于这个量级的数据,用简单的单进程/单线程导入 会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:

  1. 批量插入而不是逐条插入
  2. 为了加快插入速度,先不要建索引
  3. 生产者和消费者模型,主进程读文件,多个 worker 进程执行插入
  4. 注意控制 worker 的数量,避免对 MySQL 造成太大的压力
  5. 注意处理脏数据导致的异常
  6. 原始数据是 GBK 编码,所以还要注意转换成 UTF-8
  7. 用 click 封装命令行工具

具体的代码实现如下:

#!/usr/bin/env python# -*- coding: utf-8 -*-import codecsimport csvimport loggingimport multiprocessingimport osimport warningsimport clickimport MySQLdbimport sqlalchemywarnings.filterwarnings('ignore', category=MySQLdb.Warning)# 批量插入的记录数量BATCH = 5000DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'engine = sqlalchemy.create_engine(DB_URI)def get_table_cols(table):  sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)  res = engine.execute(sql)  return res.keys()def insert_many(table, cols, rows, cursor):  sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(      table=table,      cols=', '.join(cols),      marks=', '.join(['%s'] * len(cols)))  cursor.execute(sql, *rows)  logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)def insert_worker(table, cols, queue):  rows = []  # 每个子进程创建自己的 engine 对象  cursor = sqlalchemy.create_engine(DB_URI)  while True:    row = queue.get()    if row is None:      if rows:        insert_many(table, cols, rows, cursor)      break    rows.append(row)    if len(rows) == BATCH:      insert_many(table, cols, rows, cursor)      rows = []def insert_parallel(table, reader, w=10):  cols = get_table_cols(table)  # 数据队列,主进程读文件并往里写数据,worker 进程从队列读数据  # 注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存  queue = multiprocessing.Queue(maxsize=w*BATCH*2)  workers = []  for i in range(w):    p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))    p.start()    workers.append(p)    logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)  dirty_data_file = './{}_dirty_rows.csv'.format(table)  xf = open(dirty_data_file, 'w')  writer = csv.writer(xf, delimiter=reader.dialect.delimiter)  for line in reader:    # 记录并跳过脏数据: 键值数量不一致    if len(line) != len(cols):      writer.writerow(line)      continue    # 把 None 值替换为 'NULL'    clean_line = [None if x == 'NULL' else x for x in line]    # 往队列里写数据    queue.put(tuple(clean_line))    if reader.line_num % 500000 == 0:      logging.info('put %s tasks into queue.', reader.line_num)  xf.close()  # 给每个 worker 发送任务结束的信号  logging.info('send close signal to worker processes')  for i in range(w):    queue.put(None)  for p in workers:    p.join()def convert_file_to_utf8(f, rv_file=None):  if not rv_file:    name, ext = os.path.splitext(f)    if isinstance(name, unicode):      name = name.encode('utf8')    rv_file = '{}_utf8{}'.format(name, ext)  logging.info('start to process file %s', f)  with open(f) as infd:    with open(rv_file, 'w') as outfd:      lines = []      loop = 0      chunck = 200000      first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '\n'      lines.append(first_line)      for line in infd:        clean_line = line.decode('gb18030').encode('utf8')        clean_line = clean_line.rstrip() + '\n'        lines.append(clean_line)        if len(lines) == chunck:          outfd.writelines(lines)          lines = []          loop += 1          logging.info('processed %s lines.', loop * chunck)      outfd.writelines(lines)      logging.info('processed %s lines.', loop * chunck + len(lines))@click.group()def cli():  logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')@cli.command('gbk_to_utf8')@click.argument('f')def convert_gbk_to_utf8(f):  convert_file_to_utf8(f)@cli.command('load')@click.option('-t', '--table', required=True, help='表名')@click.option('-i', '--filename', required=True, help='输入文件')@click.option('-w', '--workers', default=10, help='worker 数量,默认 10')def load_fac_day_pro_nos_sal_table(table, filename, workers):  with open(filename) as fd:    fd.readline()  # skip header    reader = csv.reader(fd)    insert_parallel(table, reader, w=workers)if __name__ == '__main__':  cli()

以上就是本文给大家分享的全部没人了,希望大家能够喜欢


  • 上一条:
    不要用强制方法杀掉python线程
    下一条:
    python检查URL是否正常访问的小技巧
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go语言中实现字符串可逆性压缩及解压缩功能(0个评论)
    • 使用go + gin + jwt + qrcode实现网站生成登录二维码在app中扫码登录功能(0个评论)
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客