侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python自定义线程池实现方法分析

Python  /  管理员 发布于 7年前   192

本文实例讲述了Python自定义线程池实现方法。分享给大家供大家参考,具体如下:

关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠的。

我实现的这个线程池其实是根据银角的思路来实现的。

主要思路:

任务获取和执行:

1、任务加入队列,等待线程来获取并执行。
2、按需生成线程,每个线程循环取任务。

线程销毁:

1、获取任务是终止符时,线程停止。
2、线程池close()时,向任务队列加入和已生成线程等量的终止符。
3、线程池terminate()时,设置线程下次任务取到为终止符。

流程概要设计:

详细代码:

import threadingimport contextlibfrom Queue import Queueimport timeclass ThreadPool(object):  def __init__(self, max_num):    self.StopEvent = 0#线程任务终止符,当线程从队列获取到StopEvent时,代表此线程可以销毁。可设置为任意与任务有区别的值。    self.q = Queue()    self.max_num = max_num #最大线程数    self.terminal = False  #是否设置线程池强制终止    self.created_list = [] #已创建线程的线程列表    self.free_list = [] #空闲线程的线程列表    self.Deamon=False #线程是否是后台线程  def run(self, func, args, callback=None):    """    线程池执行一个任务    :param func: 任务函数    :param args: 任务函数所需参数    :param callback:    :return: 如果线程池已经终止,则返回True否则None    """    if len(self.free_list) == 0 and len(self.created_list) < self.max_num:      self.create_thread()    task = (func, args, callback,)    self.q.put(task)  def create_thread(self):    """    创建一个线程    """    t = threading.Thread(target=self.call)    t.setDaemon(self.Deamon)    t.start()    self.created_list.append(t)#将当前线程加入已创建线程列表created_list  def call(self):    """    循环去获取任务函数并执行任务函数    """    current_thread = threading.current_thread()  #获取当前线程对象・    event = self.q.get()  #从任务队列获取任务    while event != self.StopEvent:  #判断获取到的任务是否是终止符      func, arguments, callback = event#从任务中获取函数名、参数、和回调函数名      try:        result = func(*arguments)        func_excute_status =True#func执行成功状态      except Exception as e:        func_excute_status = False        result =None        print '函数执行产生错误', e#打印错误信息      if func_excute_status:#func执行成功后才能执行回调函数        if callback is not None:#判断回调函数是否是空的          try:callback(result)          except Exception as e:print '回调函数执行产生错误', e # 打印错误信息      with self.worker_state(self.free_list,current_thread):        #执行完一次任务后,将线程加入空闲列表。然后继续去取任务,如果取到任务就将线程从空闲列表移除        if self.terminal:#判断线程池终止命令,如果需要终止,则使下次取到的任务为StopEvent。          event = self.StopEvent        else: #否则继续获取任务          event = self.q.get() # 当线程等待任务时,q.get()方法阻塞住线程,使其持续等待    else:#若线程取到的任务是终止符,就销毁线程      #将当前线程从已创建线程列表created_list移除      self.created_list.remove(current_thread)  def close(self):    """    执行完所有的任务后,所有线程停止    """    full_size = len(self.created_list)#按已创建的线程数量往线程队列加入终止符。    while full_size:      self.q.put(self.StopEvent)      full_size -= 1  def terminate(self):    """    无论是否还有任务,终止线程    """    self.terminal = True    while self.created_list:      self.q.put(self.StopEvent)    self.q.queue.clear()#清空任务队列  def join(self):    """    阻塞线程池上下文,使所有线程执行完后才能继续    """    for t in self.created_list:      t.join()  @contextlib.contextmanager#上下文处理器,使其可以使用with语句修饰  def worker_state(self, state_list, worker_thread):    """    用于记录线程中正在等待的线程数    """    state_list.append(worker_thread)    try:      yield    finally:      state_list.remove(worker_thread)if __name__ == '__main__':  def Foo(arg):    return arg    # time.sleep(0.1)  def Bar(res):    print res  pool=ThreadPool(5)  # pool.Deamon=True#需在pool.run之前设置  for i in range(1000):    pool.run(func=Foo,args=(i,),callback=Bar)  pool.close()  pool.join()  # pool.terminate()  print "任务队列里任务数%s" %pool.q.qsize()  print "当前存活子线程数量:%d" % threading.activeCount()  print "当前线程创建列表:%s" %pool.created_list  print "当前线程创建列表:%s" %pool.free_list

关于上下文处理:

来个简单例子说明:

下面的代码手动自定义了一个myopen方法,模拟我们常见的with open() as f:语句。具体的contextlib模块使用,会单独开章来将。

# coding:utf-8import [email protected]#定义该函数支持上下文with语句def myopen(filename,mode):  f=open(filename,mode)  try:    yield f.readlines()#正常执行返回f.readlines()  except Exception as e:    print e  finally:    f.close()#最后在with代码快执行完毕后返回执行finally下的f.close()实现关闭文件if __name__ == '__main__':  with myopen(r'c:\ip1.txt','r') as f:    for line in f:      print line

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python Socket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。


  • 上一条:
    Python Flask基础教程示例代码
    下一条:
    Python列表推导式、字典推导式与集合推导式用法实例分析
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(95个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客