侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python定时从Mysql提取数据存入Redis的实现

Python  /  管理员 发布于 7年前   494

设计思路:

1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来

2.然后实例化redis类,将数据简单解析后逐条传入redis队列

3.定时器设计每天凌晨12点开始跑

ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。

 # -*- coding:utf-8 -*- import MySQLdbimport scheduleimport timeimport datetimeimport randomimport stringimport redis# get the data from mysqlclass FromSql(object):  def __init__(self, conn):    self.conn = conn  def acquire(self):    cursor = self.conn.cursor()    try:      sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"      cursor.execute(sql)      rs = cursor.fetchall()      #print (rs)      for eve in rs:        print('%s, %s, %s, %s' % eve)      copy_rs = rs      cursor.close()      return copy_rs     except Exception as e:      print("The error: %s" % e)class RedisQueue(object):  def __init__(self, name, namespace='queue', **redis_kwargs):    """The default connection parameters are: host='localhost', port=6379, db=0"""    self.__db= redis.Redis(**redis_kwargs)    self.key = '%s:%s' %(namespace, name)  def qsize(self):    return self.__db.llen(self.key)  def put(self, item):    self.__db.rpush(self.key, item)  def get(self, block=True, timeout=None):    if block:      item = self.__db.blpop(self.key, timeout=timeout)    else:      item = self.__db.lpop(self.key)    if item:      item = item[1]    return item  def get_nowait(self):    return self.get(False)if __name__ == "__main__":  # connect mysqldb  conn_sql = MySQLdb.connect(host = '127.0.0.1',port = 3306,user = 'root',passwd = '',db = 'test',charset = 'utf8')def job_for_redis():    get_data = FromSql(conn_sql)    data = get_data.acquire()    q = RedisQueue('test',host='localhost', port=6379, db=0)    for single_data in data:      for meta_data in single_data:        q.put(meta_data)        print(meta_data)    print("All data had been inserted.") """  try:    schedule.every().day.at("00:00").do(job_for_redis)  except Exception as e:    print('Error: %s'% e)#  finally:#    conn.close()  while True:    schedule.run_pending()    time.sleep(1)"""

补充知识:python定时获取汇率存入数据库

python定时任务:

我们可以使用 轻量级的第三方模块schedule。首先先安装:pip install schedule

定时任务的的小测试:

import scheduleimport time def job():  print("I'm working...") schedule.every(10).minutes.do(job)       # 每隔10分钟执行一次任务schedule.every().hour.do(job)          # 每隔一小时执行一次任务schedule.every().day.at("10:30").do(job)    # 每天10:30执行一次任务schedule.every(5).to(10).days.do(job)      # 每5-10天执行一次任务schedule.every().monday.do(job)         # 每周一的这个时候执行一次任务schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行一次任务 while True:  schedule.run_pending()

获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)

import pymysqlimport scheduleimport timeimport requestsimport pandasfrom sqlalchemy import create_engine#获取美元的所有外汇def job():  content = '美元'  url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外汇数据地址  html = requests.get(url).content.decode('utf-8')  index = html.index('<td>' + content + '</td>')  str = html[index:index+300]  result = re.findall('<td>(.*?)</td>',str)  print("币种:" + result[0])  print("现汇买入价:" + result[1])  print("现钞买入价:" + result[2])  print("现汇卖出价:" + result[3])  print("现钞卖出价:" + result[4])  print("中行结算价:" + result[5])  print("发布时间:" + result[6] + ' ' + result[7])   #本地地址 数据库账号 密码  数据库名  db = pymysql.connect('localhost','root','root','pinyougoudb')  cursor = db.cursor()   #sql语句  sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])  cursor.execute(sql)  db.commit()  print('success') # 查询语句,将存入的数据查出来  # sqlalchemy 进行数据库初始化  engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')  sql = '''select * from tb_money'''  # pandas 进行数据库读写  df = pandas.read_sql_query(sql,engine)  print(df)  db.commit()# 每隔几分中刷新一次#schedule.every(0.1).minutes.do(job)#每天什么时候刷新schedule.every().day.at("09:29").do(job)schedule.every().day.at("09:30").do(job)#一直循环 知道满足条件执行while True:  schedule.run_pending()

以上这篇Python定时从Mysql提取数据存入Redis的实现就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。


  • 上一条:
    Python实现汇率转换操作
    下一条:
    python函数调用,循环,列表复制实例
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • PHP 8.4 Alpha 1现已发布!(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客