龙空技术网

Flask集成Celery异步任务

程序人生 45

前言:

眼前大家对“flask异步刷新”大约比较重视,看官们都需要了解一些“flask异步刷新”的相关知识。那么小编也在网络上搜集了一些有关“flask异步刷新””的相关文章,希望咱们能喜欢,朋友们一起来学习一下吧!

Celery 是一个异步任务队列/作业队列,基于分布式消息传递实现。Flask 可以轻松集成 Celery 来处理后台任务,以便于不阻塞主线程,保持应用的响应性。以下是如何在 Flask 应用中集成 Celery 的详细步骤和示例。

安装 Celery

首先,你需要安装 Celery。这可以通过 pip 完成:

pip install celery

创建 Celery 实例

在你的 Flask 应用中,你需要创建一个 Celery 实例。通常,你会在项目的 __init__.py 文件中这么做:

from flask import Flask

from celery import Celery

app = Flask(__name__)

# 创建 Celery 实例

celery = Celery(app)

# 如果你使用的是 virtualenv,确保 CELERY_IMPORTS 指向你的虚拟环境中的模块。

# 例如: CELERY_IMPORTS = ('.tasks',)

定义任务

在 Flask 应用的一个模块中(例如 tasks.py),你需要定义一个或多个 Celery 任务。每个任务都是一个装饰器,通常是 @app.task。

from celery import shared_task

@shared_task

def add(x, y):

return x + y

执行任务

现在,你可以从 Flask 视图中调用这个任务:

from flask import Flask, render_template, url_for

from app import celery # 导入 Celery 实例

from app.tasks import add # 导入定义的任务

@app.route('/')

def index():

return render_template('index.html')

@app.route('/add')

def add_numbers():

# 调用任务并将结果返回给客户端

# 注意:结果不会立即返回,而是通过任务队列异步处理

result = add.delay(4, 5)

return render_template('sum.html', task_id=result.task_id)

@app.route('/result/')

def get_result(task_id):

# 获取任务结果

try:

result = add.AsyncResult(task_id)

if result.ready():

return str(result.get())

else:

return 'Please wait a moment...'

except Exception as e:

return str(e)

运行 Celery worker

要执行异步任务,你需要运行 Celery worker。这可以在另一台机器上作为服务运行,或者在开发模式下在同一台机器上本地运行。

celery -A your_project_name worker --loglevel=info

示例:发送电子邮件

以下是一个更实际的例子,展示如何使用 Celery 来异步发送电子邮件:

from celery import shared_task

from flask_mail import Message

@shared_task

def send_email(subject, body, recipient):

def send(subject, body, recipient):

with app.app_context():

msg = Message(subject, recipients=[recipient], body=body)

mail.send(msg)

return send(subject, body, recipient)

在 Flask 视图中调用这个任务:

from flask import render_template, request, redirect, url_for

from app.tasks import send_email

@app.route('/send-email', methods=['POST'])

def send_email_view():

subject = request.form['subject']

body = request.form['body']

recipient = request.form['recipient']

# 异步发送电子邮件

send_email.delay(subject, body, recipient)

return redirect(url_for('success'))

在这个例子中,当用户提交一个包含收件人地址、主题和消息正文的表单后,发送电子邮件的任务会被添加到 Celery 队列中,由 worker 异步处理。这样,用户的等待时间就会减少,因为主线程会立即继续处理。

配置 Celery

你可能需要配置 Celery,例如设置消息队列。Celery 支持多种后端,如 Redis、RabbitMQ、AWS SQS 等。配置通常放置在 celery.py 文件中:

from celery import Celery

app = Celery('your_project_name', broker='amqp://guest@localhost//')

app.config_from_object('celeryconfig.py')

# 自动从 Flask 应用中加载 tasks.py 等模块

app.conf.update(

CELERY_IMPORTS=('app.tasks',)

)

# 初始化 Celery 实例

celery = Celery(app)

在 celeryconfig.py 中,你可以设置各种 Celery 参数,例如消息队列、任务序列化方法、结果存储等。

通过以上步骤,你可以成功地在 Flask 应用中集成 Celery,从而实现异步任务处理。记得根据实际情况调整配置和任务处理逻辑。

标签: #flask异步刷新