侯体宗的博客
  • 首页
  • Hyperf版
  • beego仿版
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

python基于celery实现异步任务周期任务定时任务

Python  /  管理员 发布于 7年前   255

这篇文章主要介绍了python基于celery实现异步任务周期任务定时任务,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

hello, 小伙伴们, 好久不更新了,这一次带来的是celery在python中的应用以及设置异步任务周期任务和定时任务的步骤,希望能给入坑的你带来些许帮助.

首先是对celery的介绍,Celery其实是一个专注于实时处理和调度任务的分布式任务队列,同时提供操作和维护分布式系统所需要的全部数据, 因此可以用它提供的接口快速实现并管理一个分布式的任务队列,它本身不是任务队列,它是封装了操作常见任务队列的各种操作, 可以使用它快速进行任务队列的使用与管理.在Python中的组成部分是 1.用户任务 app 2.管道 broker 用于存储任务 官方推荐的是 redis rabbitMQ / backend 用于存储任务执行结果的 3, 员工 worker 大致流程入下:

最左边的是用户, 用户发起1个请求给服务器, 要服务器执行10个任务,将这10个任务分给10个调度器,即开启10个线程进行任务处理,worker会一直监听调度器是否有任务, 一旦发现有新的任务, 就会立即执行新任务,一旦执行完就会返回给调度器, 即backend, backend会将请求发送给服务器, 服务器将结果返回给用户, 表现的结果就是,这10个任务同时完成,同时返回,,这就是Celery的整个工作流程, 其中的角色分别为,任务(app_work), 调度器(broker + backend), 将任务缓存的部分, 即将所有任务暂时存在的地方,相当于生产者, 消费者(worker 可以指定数量, 即在创建worker命令的时候可以指定数量), 在worker拿到任务后,人就控制不了了, 除非把worker杀死, 不然肯定会执行完.

也即 任务来了以后, 调度器(broker)去缓存任务, worker去执行任务, 完成后返回backend,接着返回,

还有就是关于定时任务和周期任务在linux上为什么不用自身所带着的去做,是因为linux周期定时任务是不可控的, 不好管理, 返回值保存也是个麻烦事, 而celery只要开启着调度器, 就可以随时把人物结果获取到,即使用celery控制起来是非常方便的.

接下来就是实例代码:

workers.py

from celery import Celeryimport time# 创建一个Celery实例, 就是用户的应用app 第一个参数是任务名称, 可以随意起 后面的就是配置的broker和backenddiaoduqi= Celery("mytask", broker="redis://127.0.0.1:6379", backend="redis:127.0.0.1:6379")# 接下来是为应用创建任务 [email protected] ab(a,b):  time.sleep(15)  return a+b

brokers.py

from worker import ab# 将任务交给Celery的Worker执行res = ab.delay(2,4)#返回任务IDprint(res.id)

backends.py

from celery.result import AsyncResultfrom worker import diaoduqi# 异步获取任务返回值async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=diaoduqi)# 判断异步任务是否执行成功if async_task.successful():  #获取异步任务的返回值  result = async_task.get()  print(result)else:  print("任务还未执行完成")

为了方便,现在直接将三个文件代表的部分命名在文件名称中.首先是启动workers.py

启动方式是依据系统的不同来启动的, 对于linux下 celery worker -A workers -l INFO 也可以指定开启的worker数量 即在后面添加的参数是 -c 5 表示指定5个worker 理论上指定的worker是无上限的,

在windows下需要安装一个eventlet模块进行运行, 不然不会运行成功 pip install eventlet 可以开启线程 不指定数量是默认6个worker, 理论上worker的数量可以开启无限个,但是celery worker -A s1 -l INFO -P eventlet -c 5 使用eventlet 开启5个worker 执行

该命令后 处于就绪状态, 需要发布任务, 即brokers.py进行任务发布, 方法是使用delay的方式执行异步任务, 返回了一个任务id, 接着去backends.py中取这个任务id, 去查询任务是否完成,判定条件即任务.successful 判断是否执行完, 上面就是celery异步执行任务的用法与解释

接下来就是celery在项目中的应用

在实际项目中应用celery是有一定规则的, 即目录结构应该如下.

结构说明 首先是创建一个CeleryTask的包,接着是在里面创建一个celery.py,必须是这个文件 关于重名的问题, 找寻模块的顺序是先从当前目录中去寻找, 根本找不到,接着是从内置模块中去找, 根本就找不到写的这个celery这个文件,

celery.py

from celery import CeleryDDQ = Celery("DDQ",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379",       include=["CeleryTask.TaskOne","CeleryTask.TaskTwo"])

TaskOne.py

import timefrom CeleryTask.celery import [email protected] one1(a,b):  # time.sleep(3)  return [email protected] one2():  time.sleep(2)  return "one2"

taskTwo.py

import timefrom CeleryTask.celery import [email protected] two1():  time.sleep(2)  return "two1"@DDQ.taskdef two2():  time.sleep(3)  return "two2"

getR.py

from CeleryTask.TaskOne import one1 as one# one.delay(10,10)# two.delay(20,20)# 定时任务我们不在使用delay这个方法了,delay是立即交给task 去执行# 现在我们使用apply_async定时执行# 首先我们要先给task一个执行任务的时间import datetime, time# 获取当前时间 此时间为东八区时间ctime = time.time()# 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法utc_time = datetime.datetime.utcfromtimestamp(ctime)# 为当前时间增加 10 秒add_time = datetime.timedelta(seconds=10)action_time = utc_time + add_time# action_time 就是当前时间未来10秒之后的时间# 现在我们使用apply_async定时执行res = one.apply_async(args=(6, 10), eta=action_time)res = one.apply_async(args=(6, 10), eta=action_time)res = one.apply_async(args=(6, 10), eta=action_time)res = one.apply_async(args=(6, 10), eta=action_time)res = one.apply_async(args=(6, 10), eta=action_time)res = one.apply_async(args=(6, 10), eta=action_time)print(res.id)# 这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了

接着是在命令行cd到与CeleryTask同级目录下, 使用命令 celery worker -A CeleryTask -l INFO -P eventlet -c 50 这样 就开启了worker 接着去 发布任务, 在定时任务中不再使用delay这个方法了,

delay是立即交给ttask去执行, 在这里使用 apply_async定时执行 指的是调度的时候去定时执行

需要设置的是UTC时间, 以及定时的时间(多长时间以后执行) 之后使用 celery worker -A CeleryTask -l INFO -P eventlet -c 50 命令开启worker, 之后运行 getR.py文件发布任务, 可以看到在定义的时间以后执行该任务

周期任务

周期任务 指的是在指定时间去执行任务 需要导入的一个模块有 crontab

文件结构如下

结构同定时任务差不多,只不过需要变动一下文件内容 GetR文件已经不需要了,可以删除.

celery.py

from celery import Celeryfrom celery.schedules import crontabDDQ = Celery("DDQ", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379",       include=["CeleryTask.TaskOne", "CeleryTask.TaskTwo"])# 我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)DDQ.conf.beat_schedule = {  "each10s_task": {    "task": "CeleryTask.TaskOne.one1",    "schedule": 10, # 每10秒钟执行一次    "args": (10, 10)  },  "each1m_task": {    "task": "CeleryTask.TaskOne.one2",    "schedule": crontab(minute=1) # 每1分钟执行一次 也可以替换成 60 即 "schedule": 60  }}

TaskOne.py

import timefrom CeleryTask.celery import [email protected] one1(a,b):  # time.sleep(3)  return [email protected] one2():  time.sleep(2)  return "one2"

taskTwo.py

import timefrom CeleryTask.celery import [email protected] two1():  time.sleep(2)  return "two1"@DDQ.taskdef two2():  time.sleep(3)  return "two2"

以上配置完成以后,这时候就不能直接创建worker了,因为要执行周期任务,需要首先有一个任务的生产方, 即 celery beat -A CeleryTask, 用来产生创建者, 接着是创建worker worker的创建命令还是原来的命令, 即 celery worker -A CeleryTask -l INFO -P eventlet -c 50 , 创建完worker之后, 每10秒就会由beat创建一个任务给 worker去执行.至此, celery创建异步任务, 周期任务,定时任务完毕, 伙伴们自己拿去测试吧.

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


  • 上一条:
    python实现修改固定模式的字符串内容操作示例
    下一条:
    python解析多层json操作示例
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • 在python语言中Flask框架的学习及简单功能示例(0个评论)
    • 在Python语言中实现GUI全屏倒计时代码示例(0个评论)
    • Python + zipfile库实现zip文件解压自动化脚本示例(0个评论)
    • python爬虫BeautifulSoup快速抓取网站图片(1个评论)
    • vscode 配置 python3开发环境的方法(0个评论)
    • 近期文章
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(0个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 欧盟关于强迫劳动的规定的官方举报渠道及官方举报网站(0个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf文件功能(0个评论)
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • 在go + gin中gorm实现指定搜索/区间搜索分页列表功能接口实例(0个评论)
    • 在go语言中实现IP/CIDR的ip和netmask互转及IP段形式互转及ip是否存在IP/CIDR(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2018-04
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2022-01
    • 2023-07
    • 2023-10
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客