侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

python实现mysql的读写分离及负载均衡

Python  /  管理员 发布于 7年前   204

Oracle数据库有其公司开发的配套rac来实现负载均衡,目前已知的最大节点数能到128个,但是其带来的维护成本无疑是很高的,并且rac的稳定性也并不是特别理想,尤其是节点很多的时候。

       但是,相对mysql来说,rac的实用性要比mysql的配套集群软件mysql-cluster要高很多。因为从网上了解到情况来看,很少公司在使用mysql-cluster,大多数企业都会选择第三方代理软件,例如MySQL Proxy、Mycat、haproxy等,但是这会引起另外一个问题:单点故障(包括mysql-cluster:管理节点)。如果要解决这个问题,就需要给代理软件搭建集群,在访问量很大的情况下,代理软件的双机或三机集群会成为访问瓶颈,继续增加其节点数,无疑会带来各方面的成本。

那么,如何可以解决这个问题呢?

          解决上述问题,最好的方式个人认为应该是在程序中实现。通过和其他mysql DBA的沟通,也证实了这个想法。但是由此带来的疑问也就产生了:会不会增加开发成本?对现有的应用系统做修改会不会改动很大?会不会增加后期版本升级的难度?等等。

        对于一个架构设计良好的应用系统可以很肯定的回答:不会。

        那么怎么算一个架构设计良好的应用系统呢?

       简单来说,就是分层合理、功能模块之间耦合性底。以本人的经验来说,系统设计基本上可以划分为以下四层:

       1.  实体层:主要定义一些实体类

       2.  数据层:也可以叫SQL处理层。主要负责跟数据库交互取得数据

       3.  业务处:主要是根据业务流程及功能区分模块(或者说定义不同的业务类)

       4.  表现层:呈现最终结果给用户

       实现上述功能(mysql的读写分离及负载均衡),在这四个层次中,仅仅涉及到数据层。

严格来说,对于设计良好的系统,只涉及到一个类的一个函数:在数据层中,一般都会单独划分出一个连接类,并且这个连接类中会有一个连接函数,需要改动的就是这个函数:在读取连接字符串之前加一个功能函数返回需要的主机、ip、端口号等信息(没有开发经历的同学可能理解这段话有点费劲)。

       流程图如下:

           代码如下:

import mmapimport jsonimport randomimport mysql.connectorimport time##公有变量#dbinfos={#   "db0":{'host':'192.168.42.60','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,"database":"","role":"RW","weight":10,"status":1},#   "db1":{'host':'192.168.42.61','user':'root','pwd':'Abcd1234','my_user':'root','my_pwd':'Abcd.1234',"port":3306,,"database":"":"R","weight":20,"status":1}#   }dbinfos={}mmap_file = Nonemmap_time=None##这个函数返回json格式的字符串,也是实现初始化数据库信息的地方##使用json格式是为了方便数据转换,从字符串---》二进制--》字符串---》字典##如果采用其它方式共享dbinfos的方法,可以不用此方式##配置库的地址def get_json_str1(): return json.dumps(dbinfos)##读取配置库中的内容def get_json_str(): try:  global dbinfos  cnx = mysql.connector.connect(user='root', password='Abcd.1234',        host='192.168.42.60',        database='rwlb')  cursor = cnx.cursor()  cmdString="select * from rwlb"  cnt=-1  cursor.execute(cmdString)  for (host,user,pwd,my_user,my_pwd,role,weight,status,port,db ) in cursor:   cnt=cnt+1   dict_db={'host':host,'user':user,'pwd':pwd,'my_user':my_user,'my_pwd':my_pwd,"port":port,"database":db,"role":role,"weight":weight,"status":status}   dbinfos["db"+str(cnt)]=dict_db  cursor.close()  cnx.close()  return json.dumps(dbinfos) except:  cursor.close()  cnx.close()  return ""##判断是否能正常连接到数据库def check_conn_host(): try:  cnx = mysql.connector.connect(user='root', password='Abcd.1234',        host='192.168.42.60',        database='rwlb')  cursor = cnx.cursor()  cmdString="select user()"  cnt=-1  cursor.execute(cmdString)  for user in cursor:   cnt=len(user)  cursor.close()  cnx.close()  return cnt except :  return -1;##select 属于读操作,其他属于写操作-----这里可以划分的更详细,比如执行存储过程等def analyze_sql_state(sql): if "select" in sql:  return "R" else:  return "W"##读取时间信息def read_mmap_time(): global mmap_time,mmap_file mmap_time.seek(0) ##初始时间 inittime=int(mmap_time.read().translate(None, b'\x00').decode()) ##当前时间 endtime=int(time.time()) ##时间差 dis_time=endtime-inittime print("dis_time:"+str(dis_time)) #重新读取数据 if dis_time>10:  ##当配置库正常的情况下才重新读取数据  print(str(check_conn_host()))  if check_conn_host()>0:      print("read data again")   mmap_time.seek(0)   mmap_file.seek(0)   mmap_time.write(b'\x00')   mmap_file.write(b'\x00')   get_mmap_time()   get_mmap_info()  else:   print("can not connect to host")    #不重新读取数据 else:  print("do not read data again")##从内存中读取信息,def read_mmap_info(sql): read_mmap_time() print("The data is in memory") global mmap_file,dict_db mmap_file.seek(0) ##把二进制转换为字符串 info_str=mmap_file.read().translate(None, b'\x00').decode() #3把字符串转成json格式,方便后面转换为字典使用 infos=json.loads(info_str)  host_count=len(infos) ##权重列表 listw=[] ##总的权重数量 wtotal=0 ##数据库角色 dbrole=analyze_sql_state(sql) ##根据权重初始化一个列表。这个是比较简单的算法,所以权重和控制在100以内比较好----这里可以选择其他比较好的算法 for i in range(host_count):  db="db"+str(i)  if dbrole in infos[db]["role"]:   if int(infos[db]["status"])==1:    w=infos[db]["weight"]    wtotal=wtotal+w    for j in range(w):     listw.append(i) if wtotal >0:  ##产生一个随机数  rad=random.randint(0,wtotal-1)  ##读取随机数所在的列表位置的数据  dbindex=listw[rad]  ##确定选择的是哪个db  db="db"+str(dbindex)  ##为dict_db赋值,即选取的db的信息  dict_db=infos[db]  return dict_db else :  return {}##如果内存中没有时间信息,则向内存红写入时间信息def get_mmap_time(): global mmap_time ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点 mmap_time = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time') ##读取有效比特数,不包括空比特 cnt=mmap_time.read_byte() if cnt==0:  print("Load time to memory")  mmap_time = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_time')  inittime=str(int(time.time()))  mmap_time.write(inittime.encode())##如果内存中没有对应信息,则向内存中写信息以供下次调用使用def get_mmap_info(): global mmap_file ##第二个参数1024是设定的内存大小,单位:字节。如果内容较多,可以调大一点 mmap_file = mmap.mmap(-1, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap') ##读取有效比特数,不包括空比特 cnt=mmap_file.read_byte() if cnt==0:  print("Load data to memory")  mmap_file = mmap.mmap(0, 1024, access = mmap.ACCESS_WRITE, tagname = 'share_mmap')  mmap_file.write(get_json_str().encode())##测试函数def test1(): get_mmap_time() get_mmap_info() for i in range(10):  sql="select * from db"  #sql="update t set col1=a where b=2"  dbrole=analyze_sql_state(sql)  dict_db=read_mmap_info(sql)  print(dict_db["host"])def test2(): sql="select * from db" res=analyze_sql_state(sql) print("select:"+res) sql="update t set col1=a where b=2" res=analyze_sql_state(sql) print("update:"+res) sql="insert into t values(1,2)" res=analyze_sql_state(sql) print("insert:"+res) sql="delete from t where b=2" res=analyze_sql_state(sql) print("delete:"+res)##类似主函数if __name__=="__main__": test2()

测试结果:

从结果可以看出,只有第一次向内存加载数据,并且按照权重实现了负载均衡。

因为测试函数test1()写的是固定语句,所以读写分离的结果没有显示出来。

另外:测试使用的数据库表结构及数据:

 desc rwlb;+---------+-------------+------+-----+---------+-------+| Field | Type  | Null | Key | Default | Extra |+---------+-------------+------+-----+---------+-------+| host | varchar(50) | YES |  | NULL |  || user | varchar(50) | YES |  | NULL |  || pwd  | varchar(50) | YES |  | NULL |  || my_user | varchar(50) | YES |  | NULL |  || my_pwd | varchar(50) | YES |  | NULL |  || role | varchar(10) | YES |  | NULL |  || weight | int(11)  | YES |  | NULL |  || status | int(11)  | YES |  | NULL |  || port | int(11)  | YES |  | NULL |  || db  | varchar(50) | YES |  | NULL |  |+---------+-------------+------+-----+---------+-------+select * from rwlb;+---------------+------+----------+---------+-----------+------+--------+--------+------+------+| host   | user | pwd  | my_user | my_pwd | role | weight | status | port | db |+---------------+------+----------+---------+-----------+------+--------+--------+------+------+| 192.168.42.60 | root | Abcd1234 | root | Abcd.1234 | RW |  10 |  1 | NULL | NULL || 192.168.42.61 | root | Abcd1234 | root | Abcd.1234 | R |  20 |  1 | NULL | NULL |+---------------+------+----------+---------+-----------+------+--------+--------+------+------+

总结

以上所述是小编给大家介绍的python实现mysql的读写分离及负载均衡,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!


  • 上一条:
    python生成器,可迭代对象,迭代器区别和联系
    下一条:
    python负载均衡的简单实现方法
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(95个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客