侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

Python利用multiprocessing实现最简单的分布式作业调度系统实例

Python  /  管理员 发布于 7年前   166

介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。在这之前,我们先来详细了解下python中的多进程管理包multiprocessing。

multiprocessing.Process

multiprocessing包是Python中的多进程管理包。它与 threading.Thread类似,可以利用multiprocessing.Process对象来创建一个进程。该进程可以允许放在Python程序内部编写的函数中。该Process对象与Thread对象的用法相同,拥有is_alive()、join([timeout])、run()、start()、terminate()等方法。属性有:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为CN,表示被信号N结束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类,用来同步进程,其用法也与threading包中的同名类一样。multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

这个模块表示像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多。

看一下Process类的构造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:

  • group:进程所属组。基本不用
  • target:表示调用对象。
  • args:表示调用对象的位置参数元组。
  • name:别名
  • kwargs:表示调用对象的字典。

创建进程的简单实例:

#coding=utf-8import multiprocessingdef do(n) : #获取当前线程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."

执行结果:

Process-1 startingworker 0Process end.Process-2 startingworker 1Process end.Process-3 startingworker 2Process end.Process-4 startingworker 3Process end.Process-5 startingworker 4Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,并用其start()方法启动,join()方法表示等待子进程结束以后再继续往下运行,通常用于进程间的同步。

注意:

在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if __name__ == ‘__main__' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。

multiprocess.Pool

当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

apply_async和apply

函数原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

二者都是向进程池中添加新的进程,不同的时,apply每次添加新的进程时,主进程和新的进程会并行执行,但是主进程会阻塞,直到新进程的函数执行结束。 这是很低效的,所以python3.x之后不再使用

apply_async和apply功能相同,但是主进程不会阻塞。

# -*- coding:utf-8 -*-import multiprocessingimport timedef func(msg): print "*msg: ", msg time.sleep(3) print "*end"if __name__ == "__main__": # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool = multiprocessing.Pool(processes=3) for i in range(10): msg = "hello [{}]".format(i) # pool.apply(func, (msg,)) pool.apply_async(func, (msg,)) # 异步开启进程, 非阻塞型, 能够向池中添加进程而不等待其执行完毕就能再次执行循环 print "--" * 10 pool.close() # 关闭pool, 则不会有新的进程添加进去 pool.join() # 必须在join之前close, 然后join等待pool中所有的线程执行完毕 print "All process done."

运行结果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py--------------------*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*msg: hello [3]*end*end*msg: hello [4]*msg: hello [5]*end*msg: hello [6]*end*end*msg: hello [7]*msg: hello [8]*end*msg: hello [9]*end*end*endAll process done.Process finished with exit code 0

获得进程的执行结果

# -*- coding:utf-8 -*-import multiprocessingimport timedef func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg)if __name__ == "__main__": # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool = multiprocessing.Pool(processes=3) results = [] for i in range(10): msg = "hello [{}]".format(i) res = pool.apply_async(func_with_return, (msg,)) # 异步开启进程, 非阻塞型, 能够向池中添加进程而不等待其执行完毕就能再次执行循环 results.append(res) print "--" * 10 pool.close() # 关闭pool, 则不会有新的进程添加进去 pool.join() # 必须在join之前close, 然后join等待pool中所有的线程执行完毕 print "All process done." print "Return results: " for i in results: print i.get() # 获得进程的执行结果

结果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v1.py--------------------*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*end*msg: hello [3]*msg: hello [4]*end*msg: hello [5]*end*end*msg: hello [6]*msg: hello [7]*end*msg: hello [8]*end*end*msg: hello [9]*end*endAll process done.Return results: hello [0] returnhello [1] returnhello [2] returnhello [3] returnhello [4] returnhello [5] returnhello [6] returnhello [7] returnhello [8] returnhello [9] returnProcess finished with exit code 0

map

函数原型:

map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。

注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

# -*- coding:utf-8 -*-import multiprocessingimport timedef func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg)if __name__ == "__main__": # 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去 pool = multiprocessing.Pool(processes=3) results = [] msgs = [] for i in range(10): msg = "hello [{}]".format(i) msgs.append(msg) results = pool.map(func_with_return, msgs) print "--" * 10 pool.close() # 关闭pool, 则不会有新的进程添加进去 pool.join() # 必须在join之前close, 然后join等待pool中所有的线程执行完毕 print "All process done." print "Return results: " for i in results: print i # 获得进程的执行结果

执行结果:

"D:\Program Files\Anaconda2\python.exe" E:/pycharm/test/multiprocessing/v2.py*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*end*msg: hello [3]*msg: hello [4]*end*msg: hello [5]*end*end*msg: hello [6]*msg: hello [7]*end*msg: hello [8]*end*end*msg: hello [9]*end*end--------------------All process done.Return results: hello [0] returnhello [1] returnhello [2] returnhello [3] returnhello [4] returnhello [5] returnhello [6] returnhello [7] returnhello [8] returnhello [9] returnProcess finished with exit code 0

注意:执行结果中“―-”的位置,可以看到,map之后,主进程是阻塞的,等待map的结果返回

close()

关闭进程池(pool),使其不在接受新的任务。

terminate()

结束工作进程,不在处理未处理的任务。

join()

主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用。

进程间通信

多进程最麻烦的地方就是进程间通信,IPC比线程通信要难处理的多,所以留作单独一篇来记录

利用multiprocessing实现一个最简单的分布式作业调度系统

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性,将来可以封装一些作业状态,作业命令,执行用户等属性。

job.py

#!/usr/bin/env python# -*- coding: utf-8 -*-class Job: def __init__(self, job_id): self.job_id = job_id

Master

Master用来派发作业和显示运行完成的作业信息

master.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Jobclass Master: def __init__(self): # 派发出去的作业队列 self.dispatched_job_queue = Queue() # 完成的作业队列 self.finished_job_queue = Queue() def get_dispatched_job_queue(self): return self.dispatched_job_queue def get_finished_job_queue(self): return self.finished_job_queue def start(self): # 把派发作业队列和完成作业队列注册到网络上 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue) BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue) # 监听端口和启动服务 manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs') manager.start() # 使用上面注册的方法获取队列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业 job_id = 0 while True:  for i in range(0, 10):  job_id = job_id + 1  job = Job(job_id)  print('Dispatch job: %s' % job.job_id)  dispatched_jobs.put(job)  while not dispatched_jobs.empty():  job = finished_jobs.get(60)  print('Finished Job: %s' % job.job_id) manager.shutdown()if __name__ == "__main__": master = Master() master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import timefrom Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Jobclass Slave: def __init__(self): # 派发出去的作业队列 self.dispatched_job_queue = Queue() # 完成的作业队列 self.finished_job_queue = Queue() def start(self): # 把派发作业队列和完成作业队列注册到网络上 BaseManager.register('get_dispatched_job_queue') BaseManager.register('get_finished_job_queue') # 连接master server = '127.0.0.1' print('Connect to server %s...' % server) manager = BaseManager(address=(server, 8888), authkey='jobs') manager.connect() # 使用上面注册的方法获取队列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业 while True:  job = dispatched_jobs.get(timeout=1)  print('Run job: %s ' % job.job_id)  time.sleep(1)  finished_jobs.put(job)if __name__ == "__main__": slave = Slave() slave.start()

测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master

$ python master.py Dispatch job: 1Dispatch job: 2Dispatch job: 3Dispatch job: 4Dispatch job: 5Dispatch job: 6Dispatch job: 7Dispatch job: 8Dispatch job: 9Dispatch job: 10Finished Job: 1Finished Job: 2Finished Job: 3Finished Job: 4Finished Job: 5Finished Job: 6Finished Job: 7Finished Job: 8Finished Job: 9Dispatch job: 11Dispatch job: 12Dispatch job: 13Dispatch job: 14Dispatch job: 15Dispatch job: 16Dispatch job: 17Dispatch job: 18Dispatch job: 19Dispatch job: 20Finished Job: 10Finished Job: 11Finished Job: 12Finished Job: 13Finished Job: 14Finished Job: 15Finished Job: 16Finished Job: 17Finished Job: 18Dispatch job: 21Dispatch job: 22Dispatch job: 23Dispatch job: 24Dispatch job: 25Dispatch job: 26Dispatch job: 27Dispatch job: 28Dispatch job: 29Dispatch job: 30

slave1

$ python slave.py Connect to server 127.0.0.1...Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23 

slave2

$ python slave.py Connect to server 127.0.0.1...Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24 

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家的支持。


  • 上一条:
    Python基础语言学习笔记总结(精华)
    下一条:
    python中os和sys模块的区别与常用方法总结
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(95个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客