侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python concurrent.futures模块使用实例

Python  /  管理员 发布于 7年前   310

这篇文章主要介绍了Python concurrent.futures模块使用实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

concurrent.futures的作用:

管理并发任务池。concurrent.futures模块提供了使用工作线程或进程池运行任务的接口。线程和进程池API都是一样,所以应用只做最小的修改就可以在线程和进程之间地切换

1、基于线程池使用map()

futures_thread_pool_map.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport threadingimport timedef task(n):  print('{}: 睡眠 {}'.format(threading.current_thread().name,n))  time.sleep(n / 10)  print('{}: 执行完成 {}'.format(threading.current_thread().name,n))  return n / 10ex = futures.ThreadPoolExecutor(max_workers=2)print('main: 开始运行')results = ex.map(task, range(5, 0, -1)) #返回值是generator 生成器print('main: 未处理的结果 {}'.format(results))print('main: 等待真实结果')real_results = list(results)print('main: 最终结果: {}'.format(real_results))

运行效果

[root@ mnt]# python3 futures_thread_pool_map.py main: 开始运行ThreadPoolExecutor-0_0: 睡眠 5ThreadPoolExecutor-0_1: 睡眠 4main: 未处理的结果 <generator object Executor.map.<locals>.result_iterator at 0x7f1c97484678>main: 等待真实结果ThreadPoolExecutor-0_1: 执行完成 4ThreadPoolExecutor-0_1: 睡眠 3ThreadPoolExecutor-0_0: 执行完成 5ThreadPoolExecutor-0_0: 睡眠 2ThreadPoolExecutor-0_0: 执行完成 2ThreadPoolExecutor-0_0: 睡眠 1ThreadPoolExecutor-0_1: 执行完成 3ThreadPoolExecutor-0_0: 执行完成 1main: 最终结果: [0.5, 0.4, 0.3, 0.2, 0.1]

2、futures执行单个任务

futures_thread_pool_submit.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport threadingimport timedef task(n):  print('{}: 睡眠 {}'.format(threading.current_thread().name, n))  time.sleep(n / 10)  print('{}: 执行完成 {}'.format(threading.current_thread().name, n))  return n / 10ex = futures.ThreadPoolExecutor(max_workers=2)print('main :开始')f = ex.submit(task, 5)print('main: future: {}'.format(f))print('等待运行结果')results = f.result()print('main: result:{}'.format(results))print('main: future 之后的结果:{}'.format(f))

运行效果

[root@ mnt]# python3 futures_thread_pool_submit.py main :开始ThreadPoolExecutor-0_0: 睡眠 5main: future: <Future at 0x7f40c0a6a400 state=running>等待运行结果ThreadPoolExecutor-0_0: 执行完成 5main: result:0.5main: future 之后的结果:<Future at 0x7f40c0a6a400 state=finished returned float>

3、futures.as_completed()按任意顺序运行结果

futures_as_completed.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import randomimport timefrom concurrent import futuresdef task(n):  time.sleep(random.random())  return (n, n / 10)ex = futures.ThreadPoolExecutor(max_workers=2)print('main: 开始')wait_for = [  ex.submit(task, i) for i in range(5, 0, -1)]for f in futures.as_completed(wait_for):  print('main: result:{}'.format(f.result()))

运行效果

[root@ mnt]# python3 futures_as_completed.py main: 开始main: result:(5, 0.5)main: result:(4, 0.4)main: result:(3, 0.3)main: result:(1, 0.1)main: result:(2, 0.2)

4、Future回调之futures.add_done_callback()

futures_future_callback.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport timedef task(n):  print('task {} : 睡眠'.format(n))  time.sleep(0.5)  print('task {} : 完成'.format(n))  return n / 10def done(fn):  if fn.cancelled():    print('done {}:取消'.format(fn.arg))  elif fn.done():    error = fn.exception()    if error:      print('done {} : 错误返回 : {}'.format(fn.arg, error))    else:      result = fn.result()      print('done {} : 正常返回 : {}'.format(fn.arg, result))if __name__ == '__main__':  ex = futures.ThreadPoolExecutor(max_workers=2)  print('main : 开始')  f = ex.submit(task, 5)  f.arg = 5  f.add_done_callback(done)  result = f.result()

运行效果

[root@ mnt]# python3 futures_future_callback.py main : 开始task 5 : 睡眠task 5 : 完成done 5 : 正常返回 : 0.5

5、Future任务取消之futures.cancel()

futures_future_callback_cancel.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport timedef task(n):  print('task {} : 睡眠'.format(n))  time.sleep(0.5)  print('task {} : 完成'.format(n))  return n / 10def done(fn):  if fn.cancelled():    print('done {}:取消'.format(fn.arg))  elif fn.done():    error = fn.exception()    if error:      print('done {} : 错误返回 : {}'.format(fn.arg, error))    else:      result = fn.result()      print('done {} : 正常返回 : {}'.format(fn.arg, result))if __name__ == '__main__':  ex = futures.ThreadPoolExecutor(max_workers=2)  print('main : 开始')  tasks = []  for i in range(10, 0, -1):    print('main: submitting {}'.format(i))    f = ex.submit(task, i)    f.arg = i    f.add_done_callback(done)    tasks.append((i, f))  for i, task_obj in reversed(tasks):    if not task_obj.cancel():      print('main: 不能取消{}'.format(i))  ex.shutdown()

运行效果

[root@mnt]# python3 futures_future_callback_cancel.py main : 开始main: submitting 10task 10 : 睡眠main: submitting 9task 9 : 睡眠main: submitting 8main: submitting 7main: submitting 6main: submitting 5main: submitting 4main: submitting 3main: submitting 2main: submitting 1done 1:取消done 2:取消done 3:取消done 4:取消done 5:取消done 6:取消done 7:取消done 8:取消main: 不能取消9main: 不能取消10task 10 : 完成done 10 : 正常返回 : 1.0task 9 : 完成done 9 : 正常返回 : 0.9

6、Future异常的处理

futures_future_exception

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresdef task(n):  print('{} : 开始'.format(n))  raise ValueError('这个值不太好 {}'.format(n))ex = futures.ThreadPoolExecutor(max_workers=2)print('main: 开始...')f = ex.submit(task, 5)error = f.exception()print('main: error:{}'.format(error))try:  result = f.result()except ValueError as e:  print('访问结果值的异常 {}'.format(e))

运行效果

[root@mnt]# python3 futures_future_exception.py main: 开始...5 : 开始main: error:这个值不太好 5访问结果值的异常 这个值不太好 5

7、Future上下文管理即利用with打开futures.ThreadPoolExecutor()

futures_context_manager.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresdef task(n):  print(n)with futures.ThreadPoolExecutor(max_workers=2) as ex:  print('main: 开始')  ex.submit(task, 1)  ex.submit(task, 2)  ex.submit(task, 3)  ex.submit(task, 4)print('main: 结束')

运行效果

[root@ mnt]# python3 futures_context_manager.py main: 开始24main: 结束

8、基于进程池使用map()

futures_process_pool_map.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport osdef task(n):  return (n, os.getpid())if __name__ == '__main__':  ex = futures.ProcessPoolExecutor(max_workers=2)  results = ex.map(task, range(50, 0, -1))  for n, pid in results:    print('task {} in 进程id {}'.format(n, pid))

运行效果

[root@ mnt]# python3 futures_process_pool_map.py task 5 in 进程id 9192task 4 in 进程id 8668task 3 in 进程id 9192task 2 in 进程id 8668task 1 in 进程id 9192

9、基于进程池异常处理

futures_process_pool_broken.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from concurrent import futuresimport osimport signaldef task(n):  return (n, os.getpid())if __name__ == '__main__':  with futures.ProcessPoolExecutor(max_workers=2) as ex:    print('获取工作进程的id')    f1 = ex.submit(os.getpid)    pid1 = f1.result()    print('结束进程 {}'.format(pid1))    os.kill(pid1, signal.SIGHUP)    print('提交其它进程')    f2 = ex.submit(os.getpid)    try:      pid2 = f2.result()    except futures.process.BrokenProcessPool as e:      print('不能开始新的任务:{}'.format(e))

运行效果

[root@ mnt]# python3 futures_process_pool_broken.py 获取工作进程的id结束进程 104623提交其它进程不能开始新的任务:A process in the process pool was terminated abruptly while the future was running or pending.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    Python 音频生成器的实现示例
    下一条:
    Python hmac模块使用实例解析
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客