侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

基于python的Paxos算法实现

Python  /  管理员 发布于 7年前   142

理解一个算法最快,最深刻的做法,我觉着可能是自己手动实现,虽然项目中不用自己实现,有已经封装好的算法库,供我们调用,我觉着还是有必要自己亲自实践一下。

这里首先说明一下,python这种动态语言,对不熟悉的人可能看着比较别扭,不像java那样参数类型是固定的,所以看着会有些蛋疼。这里环境用的是python2.7。

class Message:  # command  MSG_ACCEPTOR_AGREE = 0 # 追随者约定  MSG_ACCEPTOR_ACCEPT = 1 # 追随者接受  MSG_ACCEPTOR_REJECT = 2 # 追随者拒绝-网络不通  MSG_ACCEPTOR_UNACCEPT = 3 # 追随者网络通-不同意  MSG_ACCEPT = 4 # 接受  MSG_PROPOSE = 5 # 提议  MSG_EXT_PROPOSE = 6 # 额外提议  MSG_HEARTBEAT = 7 # 心跳,每隔一段时间同步消息  def __init__(self, command=None):    self.command = command  # 把收到的消息原原路返回,作为应答消息  def copyAsReply(self, message):    # 提议ID #当前的ID #发给谁 #谁发的    self.proposalID, self.instanceID, self.to, self.source = message.proposalID, message.instanceID, message.source, message.to    self.value = message.value # 发的信息

然后是利用socket,线程和队列实现的消息处理器:

# 基于socket传递消息,封装网络传递消息import threadingimport pickleimport socketimport queueclass MessagePump(threading.Thread):  # 收取消息线程  class MPHelper(threading.Thread):    #    def __init__(self, owner):      self.owner = owner      threading.Thread.__init__(self)    def run(self):      while not self.owner.abort: # 只要所有者线程没有结束,一直接受消息        try:          (bytes, addr) = self.owner.socket.recvfrom(2048) # 收取消息          msg = pickle.loads(bytes) # 读取二进制数据转化为消息          msg.source = addr[1]          self.owner.queue.put(msg) # 队列存入消息        except Exception as e:          pass  def __init__(self, owner, port, timeout=2):    threading.Thread.__init__(self)    self.owner = owner    self.abort = False    self.timeout = 2    self.port = port    self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP通信    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 200000) # 通信参数    self.socket.bind(("localhost", port)) # 通信地址,ip,端口    self.socket.settimeout(timeout) # 超时设置    self.queue = queue.Queue() # 队列    self.helper = MessagePump.MPHelper(self) # 接收消息  # 运行主线程  def run(self):    self.helper.start() # 开启收消息的线程    while not self.abort:      message = self.waitForMessage() # 阻塞等待      self.owner.recvMessage(message) # 收取消息  # 等待消息  def waitForMessage(self):    try:      msg = self.queue.get(True, 3) # 抓取数据,最多等待3s      return msg    except:      return None  # 发送消息  def sendMessage(self, message):    bytes = pickle.dumps(message) # 转化为二进制    address = ("localhost", message.to) # 地址ip,端口(ip,port)    self.socket.sendto(bytes, address)    return True  #是否停止收取消息  def doAbort(self):    self.abort = True

再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的

from MessagePump import MessagePumpimport randomclass AdversarialMessagePump(MessagePump): # 类的继承  # 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序  def __init__(self, owner, port, timeout=2):    MessagePump.__init__(self, owner, port, timeout) # 初始化父类    self.messages = set() # 集合避免重复  def waitForMessage(self):    try:      msg = self.queue.get(True, 0.1) # 从队列抓取数据      self.messages.add(msg) # 添加消息    except Exception as e: # 处理异常      pass      # print(e)    if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!      msg = random.choice(list(self.messages)) # 随机抓取消息发送      self.messages.remove(msg) # 删除消息    else:      msg = None    return msg

再来一个是记录类

# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议from PaxosLeaderProtocol import PaxosLeaderProtocolclass InstanceRecord:  def __init__(self):    self.protocols = {}    self.highestID = (-1, -1) # (port,count)    self.value = None  def addProtocol(self, protocol):    self.protocols[protocol.proposalID] = protocol    #    if protocol.proposalID[1] > self.highestID[1] or (        protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):      self.highestID = protocol.proposalID # 取得编号最大的协议  def getProtocol(self, protocolID):    return self.protocols[protocolID]  def cleanProtocols(self):    keys = self.protocols.keys()    for k in keys:      protocol = self.protocols[k]      if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:        print("删除协议")        del self.protocols[k]

下面就是Acceptor的实现:

# 追随者from MessagePump import MessagePumpfrom Message import Messagefrom InstanceRecord import InstanceRecordfrom PaxosAcceptorProtocol import PaxosAcceptorProtocolclass PaxosAcceptor:  def __init__(self, port, leaders):    self.port = port    self.leaders = leaders    self.instances = {} # 接口列表    self.msgPump = MessagePump(self, self.port) # 消息传递器    self.failed = False  # 开始消息传送  def start(self):    self.msgPump.start()  # 停止  def stop(self):    self.msgPump.doAbort()  # 失败  def fail(self):    self.failed = True  def recover(self):    self.failed = False  # 发送消息  def sendMessage(self, message):    self.msgPump.sendMessage(message)  # 收消息,只收取为提议的消息  def recvMessage(self, message):    if message == None:      return    if self.failed: # 失败状态不收取消息      return    if message.command == Message.MSG_PROPOSE: # 判断消息是否为提议      if message.instanceID not in self.instances:        record = InstanceRecord() # 记录器        self.instances[message.instanceID] = record      protocol = PaxosAcceptorProtocol(self) # 创建协议      protocol.recvProposal(message) # 收取消息      self.instances[message.instanceID].addProtocol(protocol)    else:      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)  # 通知客户端,  def notifyClient(self, protocol, message):    if protocol.state == PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED: # 提议被接受,通知      self.instances[protocol.instanceID].value = message.value # 储存信息      print(u"协议被客户端接受 %s" % message.value)  # 获取最高同意的建议  def getHighestAgreedProposal(self, instance):    return self.instances[instance].highestID # (port,count)  # 获取接口数据  def getInstanceValue(self, instance):    return self.instances[instance].value

那再看下AcceptorProtocol的实现:

from Message import Messageclass PaxosAcceptorProtocol(object):  # State variables  STATE_UNDEFINED = -1 # 协议没有定义的情况0  STATE_PROPOSAL_RECEIVED = 0 # 收到消息  STATE_PROPOSAL_REJECTED = 1 # 拒绝链接  STATE_PROPOSAL_AGREED = 2 # 同意链接  STATE_PROPOSAL_ACCEPTED = 3 # 同意请求  STATE_PROPOSAL_UNACCEPTED = 4 # 拒绝请求  def __init__(self, client):    self.client = client    self.state = PaxosAcceptorProtocol.STATE_UNDEFINED  # 收取,只处理协议类型的消息  def recvProposal(self, message):    if message.command == Message.MSG_PROPOSE: # 协议      self.proposalID = message.proposalID      self.instanceID = message.instanceID      (port, count) = self.client.getHighestAgreedProposal(message.instanceID) # 端口,协议内容的最高编号      # 检测编号处理消息协议      # 判断协议是否最高       if count < self.proposalID[1] or (count == self.proposalID[1] and port < self.proposalID[0]):        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED # 协议同意        print("同意协议:%s, %s " % (message.instanceID, message.value))        value = self.client.getInstanceValue(message.instanceID)        msg = Message(Message.MSG_ACCEPTOR_AGREE) # 同意协议        msg.copyAsReply(message)        msg.value = value        msg.sequence = (port, count)        self.client.sendMessage(msg) # 发送消息      else: # 不再接受比最高协议小的提议        self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_REJECTED      return self.proposalID    else:      # 错误重试      pass  # 过度  def doTransition(self, message): # 如果当前协议状态是接受连接,消息类型是接受    if self.state == PaxosAcceptorProtocol.STATE_PROPOSAL_AGREED and message.command == Message.MSG_ACCEPT:      self.state = PaxosAcceptorProtocol.STATE_PROPOSAL_ACCEPTED # 接收协议      msg = Message(Message.MSG_ACCEPTOR_ACCEPT) # 创造消息      msg.copyAsReply(message) # 拷贝并回复      for l in self.client.leaders:        msg.to = l        self.client.sendMessage(msg) # 给领导发送消息      self.notifyClient(message) # 通知自己      return True    raise Exception("并非预期的状态和命令")  # 通知 自己客户端  def notifyClient(self, message):    self.client.notifyClient(self, message)

接着看下Leader和LeaderProtocol实现:

# 领导者import threadingimport Queueimport timefrom Message import Messagefrom MessagePump import MessagePumpfrom InstanceRecord import InstanceRecordfrom PaxosLeaderProtocol import PaxosLeaderProtocolclass PaxosLeader:  # 定时监听  class HeartbeatListener(threading.Thread):    def __init__(self, leader):      self.leader = leader      self.queue = Queue.Queue() # 消息队列      self.abort = False      threading.Thread.__init__(self)    def newHB(self, message):      self.queue.put(message)    def doAbort(self):      self.abort = True    def run(self): # 读取消息      elapsed = 0      while not self.abort:        s = time.time()        try:          hb = self.queue.get(True, 2)          # 设定规则,谁的端口号比较高,谁就是领导          if hb.source > self.leader.port:self.leader.setPrimary(False)        except:          self.leader.setPrimary(True)  # 定时发送  class HeartbeatSender(threading.Thread):    def __init__(self, leader):      threading.Thread.__init__(self)      self.leader = leader      self.abort = False    def doAbort(self):      self.abort = True    def run(self):      while not self.abort:        time.sleep(1)        if self.leader.isPrimary:          msg = Message(Message.MSG_HEARTBEAT)          msg.source = self.leader.port          for leader in self.leader.leaders:msg.to = leaderself.leader.sendMessage(msg)  def __init__(self, port, leaders=None, acceptors=None):    self.port = port    if leaders == None:      self.leaders = []    else:      self.leaders = leaders    if acceptors == None:      self.acceptors = []    else:      self.acceptors = acceptors    self.group = self.leaders + self.acceptors # 集合合并    self.isPrimary = False # 自身是不是领导    self.proposalCount = 0    self.msgPump = MessagePump(self, port) # 消息传送器    self.instances = {}    self.hbListener = PaxosLeader.HeartbeatListener(self) # 监听    self.hbSender = PaxosLeader.HeartbeatSender(self) # 发送心跳    self.highestInstance = -1 # 协议状态    self.stoped = True # 是否正在运行    self.lasttime = time.time() # 最后一次时间  def sendMessage(self, message):    self.msgPump.sendMessage(message)  def start(self):    self.hbSender.start()    self.hbListener.start()    self.msgPump.start()    self.stoped = False  def stop(self):    self.hbSender.doAbort()    self.hbListener.doAbort()    self.msgPump.doAbort()    self.stoped = True  def setPrimary(self, primary): # 设置领导者    if self.isPrimary != primary:      # Only print if something's changed      if primary:        print(u"我是leader%s" % self.port)      else:        print(u"我不是leader%s" % self.port)    self.isPrimary = primary  # 获取所有的领导下面的追随者  def getGroup(self):    return self.group  def getLeaders(self):    return self.leaders  def getAcceptors(self):    return self.acceptors  # 必须获得1/2以上的人支持  def getQuorumSize(self):    return (len(self.getAcceptors()) / 2) + 1  def getInstanceValue(self, instanceID):    if instanceID in self.instances:      return self.instances[instanceID].value    return None  def getHistory(self): # 历史记录    return [self.getInstanceValue(i) for i in range(1, self.highestInstance + 1)]  # 抓取同意的数量  def getNumAccpted(self):    return len([v for v in self.getHistory() if v != None])  # 抓取空白时间处理下事务  def findAndFillGaps(self):    for i in range(1, self.highestInstance):      if self.getInstanceValue(i) == None:        print("填充空白", i)        self.newProposal(0, i)    self.lasttime = time.time()  # 采集无用信息  def garbageCollect(self):    for i in self.instances:      self.instances[i].cleanProtocols()  # 通知领导  def recvMessage(self, message):    if self.stoped:      return    if message == None:      if self.isPrimary and time.time() - self.lasttime > 15.0:        self.findAndFillGaps()        self.garbageCollect()      return    #处理心跳信息    if message.command == Message.MSG_HEARTBEAT:      self.hbListener.newHB(message)      return True    #处理额外的提议    if message.command == Message.MSG_EXT_PROPOSE:      print("额外的协议", self.port, self.highestInstance)      if self.isPrimary:        self.newProposal(message.value)      return True    if self.isPrimary and message.command != Message.MSG_ACCEPTOR_ACCEPT:      self.instances[message.instanceID].getProtocol(message.proposalID).doTransition(message)    if message.command == Message.MSG_ACCEPTOR_ACCEPT:      if message.instanceID not in self.instances:        self.instances[message.instanceID] = InstanceRecord()      record = self.instances[message.instanceID]      if message.proposalID not in record.protocols:#创建协议        protocol = PaxosLeaderProtocol(self)        protocol.state = PaxosLeaderProtocol.STATE_AGREED        protocol.proposalID = message.proposalID        protocol.instanceID = message.instanceID        protocol.value = message.value        record.addProtocol(protocol)      else:        protocol = record.getProtocol(message.proposalID)      protocol.doTransition(message)    return True  # 新建提议  def newProposal(self, value, instance=None):    protocol = PaxosLeaderProtocol(self)    if instance == None: # 创建协议标号      self.highestInstance += 1      instanceID = self.highestInstance    else:      instanceID = instance    self.proposalCount += 1    id = (self.port, self.proposalCount)    if instanceID in self.instances:      record = self.instances[instanceID]    else:      record = InstanceRecord()      self.instances[instanceID] = record    protocol.propose(value, id, instanceID)    record.addProtocol(protocol)  def notifyLeader(self, protocol, message):    if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:      print("协议接口%s被%s接受" % (message.instanceID, message.value))      self.instances[message.instanceID].accepted = True      self.instances[message.instanceID].value = message.value      self.highestInstance = max(message.instanceID, self.highestInstance)      return    if protocol.state == PaxosLeaderProtocol.STATE_REJECTED: # 重新尝试      self.proposalCount = max(self.proposalCount, message.highestPID[1])      self.newProposal(message.value)      return True    if protocol.state == PaxosLeaderProtocol.STATE_UNACCEPTED:      pass

LeaderProtocol实现:

from Message import Messageclass PaxosLeaderProtocol(object):  STATE_UNDEFINED = -1 # 协议没有定义的情况0  STATE_PROPOSED = 0 # 协议消息  STATE_REJECTED = 1 # 拒绝链接  STATE_AGREED = 2 # 同意链接  STATE_ACCEPTED = 3 # 同意请求  STATE_UNACCEPTED = 4 # 拒绝请求  def __init__(self, leader):    self.leader = leader    self.state = PaxosLeaderProtocol.STATE_UNDEFINED    self.proposalID = (-1, -1)    self.agreecount, self.acceptcount = (0, 0)    self.rejectcount, self.unacceptcount = (0, 0)    self.instanceID = -1    self.highestseen = (0, 0)  # 提议  def propose(self, value, pID, instanceID):    self.proposalID = pID    self.value = value    self.instanceID = instanceID    message = Message(Message.MSG_PROPOSE)    message.proposalID = pID    message.instanceID = instanceID    message.value = value    for server in self.leader.getAcceptors():      message.to = server      self.leader.sendMessage(message)    self.state = PaxosLeaderProtocol.STATE_PROPOSED    return self.proposalID  # ^度  def doTransition(self, message):    # 根B\行fh    if self.state == PaxosLeaderProtocol.STATE_PROPOSED:      if message.command == Message.MSG_ACCEPTOR_AGREE:        self.agreecount += 1        if self.agreecount >= self.leader.getQuorumSize(): # 选举          print(u"达成协议的法定人数,最后的价值回答是:%s" % message.value)          if message.value != None:if message.sequence[0] > self.highestseen[0] or (    message.sequence[0] == self.highestseen[0] and message.sequence[1] > self.highestseen[  1]):  self.value = message.value  self.highestseen = message.sequenceself.state = PaxosLeaderProtocol.STATE_AGREED # 同意更新# 发送同意消息msg = Message(Message.MSG_ACCEPT)msg.copyAsReply(message)msg.value = self.valuemsg.leaderID = msg.tofor server in self.leader.getAcceptors():  msg.to = server  self.leader.sendMessage(msg)self.leader.notifyLeader(self, message)          return True        if message.command == Message.MSG_ACCEPTOR_REJECT:          self.rejectcount += 1          if self.rejectcount >= self.leader.getQuorumSize():self.state = PaxosLeaderProtocol.STATE_REJECTEDself.leader.notifyLeader(self, message)          return True    if self.state == PaxosLeaderProtocol.STATE_AGREED:      if message.command == Message.MSG_ACCEPTOR_ACCEPT: # 同意协议        self.acceptcount += 1        if self.acceptcount >= self.leader.getQuorumSize():          self.state = PaxosLeaderProtocol.STATE_ACCEPTED # 接受          self.leader.notifyLeader(self, message)      if message.command == Message.MSG_ACCEPTOR_UNACCEPT:        self.unacceptcount += 1        if self.unacceptcount >= self.leader.getQuorumSize():          self.state = PaxosLeaderProtocol.STATE_UNACCEPTED          self.leader.notifyLeader(self, message)

测试模块:

import socket, pickle, timefrom Message import Messagefrom PaxosAcceptor import PaxosAcceptorfrom PaxosLeader import PaxosLeaderif __name__ == "__main__":  # 设定5个客户端  numclients = 5  clients = [PaxosAcceptor(port, [54321, 54322]) for port in range(64320, 64320 + numclients)]  # 两个领导者  leader1 = PaxosLeader(54321, [54322], [c.port for c in clients])  leader2 = PaxosLeader(54322, [54321], [c.port for c in clients])  # 开启领导者与追随者  leader1.start()  leader1.setPrimary(True)  leader2.setPrimary(True)  leader2.start()  for c in clients:    c.start()  # 破坏,客户端不链接  clients[0].fail()  clients[1].fail()  # 通信  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # udp协议  start = time.time()  for i in range(1000):    m = Message(Message.MSG_EXT_PROPOSE) # 消息    m.value = 0 + i # 消息参数    m.to = 54322 # 设置传递的端口    bytes = pickle.dumps(m) # 提取的二进制数据    s.sendto(bytes, ("localhost", m.to)) # 发送消息  while leader2.getNumAccpted() < 999:    print("休眠的这一秒 %d " % leader2.getNumAccpted())    time.sleep(1)  print(u"休眠10秒")  time.sleep(10)  print(u"停止leaders")  leader1.stop()  leader2.stop()  print(u"停止客户端")  for c in clients:    c.stop()  print(u"leader1历史纪录")  print(leader1.getHistory())  print(u"leader2历史纪录")  print(leader2.getHistory())  end = time.time()  print(u"一共用了%f秒" % (end - start))

代码确实比较长,看起来有些困难,最好还是在pycharm上看这个逻辑,可以快速定位参数指向,如果有不对的地方欢迎指正

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    python画图--输出指定像素点的颜色值方法
    下一条:
    python区块及区块链的开发详解
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客