设为首页收藏本站

嵌入式天空

 找回密码
 我要注册

扫一扫,访问微社区

最近看过此主题的会员

查看: 426|回复: 0

django+redis+celery

[复制链接]

1

主题

1

帖子

5

积分

新手上路

Rank: 1

积分
5
发表于 2019-1-19 14:20:31 | 显示全部楼层 |阅读模式
Celery+Redis
基本概念
Brokers
brokers 中文意思为中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的⻆色,brokers 就是生产者和消费者存放/拿取产品的地方(队列),常见的 brokers 有 rabbitmq、redis、Zookeeper 等。


Result Stores / backend
顾名思义就是结果储存的地⽅方,队列列中的任务运⾏行行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果,就是 Result Stores 了,常见的 backend 有 redis、Memcached 甚至常用的数据都可以。


Workers
就是 Celery 中的⼯工作者,类似与⽣生产/消费模型中的消费者,其从队列列中取出任务并执行。


Tasks
就是我们想在队列列中进⾏行行的任务,一般由用户、触发器器或其他操作将任务入队,然后交由 workers 进行处理


Redis
安装Redis(Centos 7)
yum  install redis
systemctl start redis
celery
安装celery
pip install redis
pip install celery


创建简单例子
创建tasks.py文件


from celery import Celery


app = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')


@app.task
def add(no):
        for i in range(no) :
                print('hello celery')




启动celery工作者
(venv) [embsky@localhost Test]$ celery -A tasks worker --loglevel=info
创建tast.py
from tasks import addimport time#调度任务result = add.delay(2)#等待任务完成while not result.ready():        time.sleep(1)#获取任务结果print('task done: {0}'.format(result.get()))
调度任务(venv) [embsky@localhost Test]$ python test.py
任务继承利用任务继承可以实现任务结束回调或者任务失败回调
构建任务类(在tasks.py中)from celery import Celeryapp = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')from celery import Taskclass MyTask(Task):    def on_success(self, retval, task_id, args, kwargs):        print ('=================task done: {0}'.format(retval))        return super(MyTask, self).on_success(retval, task_id, args, kwargs)        def on_failure(self, exc, task_id, args, kwargs, einfo):        print ('=================task fail, reason: {0}'.format(exc))        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)@app.task(base=MyTask)def taskfunc(x, y):    return x + y
启动celery工作者(venv) [embsky@localhost Test]$ celery -A tasks worker --loglevel=NOTSET
任务调度(tast.py)from tasks import addimport timefrom tasks import taskfuncresult = taskfunc.delay(1,2)while not result.ready() :        time.sleep(1)print('task done: {}'.format(result.get()))
(venv) [embsky@localhost Test]$ python test.py
任务状态回调利用任务状态回调可以实现监测一个后台的进度
构建任务(tasts.py)from celery import Celeryimport time
app = Celery('tasks',  backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
@app.task(bind=True)   #bind=True代表绑定任务为实例方法
def test_mes(self):    for i in range(10):        time.sleep(1)        #更新状态,状态为PROGRESS,状态的值为{'p':i}        self.update_state(state="PROGRESS", meta={'p': i})    return 'finish'
启动celery工作者(venv) [embsky@localhost Test]$ celery -A tasks worker --loglevel=NOTSET
任务调度(test.py)
from tasks import test_mesimport sys
def pm(body):    #获取状态值    res = body.get('result')    #获得状态,如果任务结束的话或得的状态是SUCCESS    if body.get('status') == 'PROGRESS':        sys.stdout.write('\r任务进度: {0}%'.format(res.get('p')))        sys.stdout.flush()    else:        print('\r')        print(res)r = test_mes.delay()#任务每一次更新状态(任务结束也会更新状态)pm方法都会被调用,body参数为任务的信息print(r.get(on_message=pm, propagate=True))
任务的其他状态
参数
说明

PENDING任务等待中
STARTED任务已开始
SUCCESS任务执行成功
FAILURE任务执行失败
RETRY任务将被重试
REVOKED任务取消














回复

使用道具 举报

您需要登录后才可以回帖 登录 | 我要注册

本版积分规则

QQ|Archiver|手机版|小黑屋|EBMSKY Inc. ( 冀ICP备17022971号-1  

GMT+8, 2019-4-19 00:18 , Processed in 0.077366 second(s), 37 queries .

Powered by Discuz! X3.2

© 2014-2018 Comsenz Inc. 【嵌入式天空】设计

快速回复 返回顶部 返回列表