• 日常搜索
  • 百度一下
  • Google
  • 在线工具
  • 搜转载

使用 Celery 和 Django 进行后台任务处理

Web 应用程序通常一开始很简单,但可能会变得相当复杂,并且其中大多数很快就超出了仅响应 HTTP 请求的责任。

发生这种情况时,必须区分必须立即发生的事情(通常在 HTTP 请求生命周期中)和最终可能发生的事情。这是为什么?好吧,因为当您的应用程序因流量过载时,像这样的简单事情会有所作为。

Web 应用程序中的操作可以分为关键或请求时间操作和后台任务,即在请求时间之外发生的操作。这些映射到上面描述的那些:

  • 需要立即发生:请求时操作

  • 最终需要发生:后台任务

请求时操作可以在单个请求/响应周期中完成,而不必担心操作会超时或用户可能会有不好的体验。常见示例包括 CRUD(创建、读取、更新、删除)数据库操作和用户管理(登录/注销例程)。

后台任务不同,因为它们通常非常耗时并且容易失败,主要是由于外部依赖。复杂 Web 应用程序中的一些常见场景包括:

  • 发送确认或活动电子邮件

  • 每天从各种来源爬取和抓取一些信息并存储它们

  • 进行数据分析

  • 删除不需要的资源

  • 以各种格式导出文件/照片

后台任务是本教程的重点。用于此场景的最常见的编程模式是生产者消费者架构。

简单来说,这个架构可以这样描述:

  • 生产者创建数据或任务。

  • 任务被放入称为任务队列的队列中。

  • 消费者负责消费数据或运行任务。

通常,消费者以先进先出 (FIFO) 方式或根据其优先级从队列中检索任务。消费者也被称为工人,这是我们将始终使用的术语,因为它与所讨论的技术使用的术语一致。

后台可以处理哪些任务?任务:

  • 对于 Web 应用程序的基本功能不是必需的

  • 不能在请求/响应周期中运行,因为它们很慢(I/O 密集型等)

  • 依赖于可能不可用或行为不符合预期的外部资源

  • 可能需要至少重试一次

  • 必须按计划执行

Celery 是在 python/Django 生态系统中进行后台任务处理的事实上的选择。它具有简单明了的 api,并且与 Django 完美集成。它支持任务队列的各种技术和工作人员的各种范例。

在本教程中,我们将创建一个使用后台任务处理的 Django 玩具 Web 应用程序(处理真实场景)。

  • 设置东西

  • 发送确认电子邮件

  • 异步发送电子邮件

  • 芹菜的定期任务

  • 使任务更可靠

设置东西

假设您已经熟悉 Python 包管理和虚拟环境,让我们安装 Django:

$ pip install Django

我决定构建另一个博客应用程序。该应用程序的重点将放在简单性上。用户可以简单地创建一个帐户,无需大惊小怪就可以创建一个帖子并将其发布到平台上。

设置quick_publisherDjango 项目:

$ django-admin startproject quick_publisher

让我们启动应用程序:

$ cd quick_publisher 
$ ./manage.py startapp main

在开始一个新的 Django 项目时,我喜欢创建一个main包含自定义用户模型等内容的应用程序。我经常遇到默认 DjangoUser模型的限制。拥有自定义User模型给我们带来了灵活性的好处。

# main/models.py
 
from django.db import models
from django.contrib.auth.models import AbstractBaseUser,PermissionsMixin,BaseUserManager
 
# Create your models here.
 
class UserAccountManager(BaseUserManager):
    def create_user(self, email, password=None, **extra_fields):
        if not email:
            raise ValueError('Users must have an email address')
 
        user = self.model(
            email=self.normalize_email(email), **extra_fields
        )
 
        user.set_password(password)
        user.save(using=self._db)
        return user
  
    def create_superuser(self, email, password=None,**extra_fields):
        user = self.create_user(
            email,
            password=password,**extra_fields
             
        )
        user.is_admin = True
        user.save(using=self._db)
        return user
 
     
class MyUser(AbstractBaseUser):
    email = models.EmailField(
        verbose_name='email address',
        max_length=255,
        unique=True,
    )
    first_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
    last_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
    is_active = models.BooleanField(default=True)
    is_admin = models.BooleanField(default=False)
 
    objects = UserAccountManager()
 
    USERNAME_FIELD = 'email'
    requireD_FIELDS = ['first_name','last_name']
 
    def __str__(self):
        return self.email
 
    def has_perm(self, perm, obj=None):
        "Does the user have a specific permission?"
        # Simplest possible answer: Yes, always
        return True
 
    def has_module_perms(self, app_label):
        "Does the user have permissions to view the app `app_label`?"
        # Simplest possible answer: Yes, always
        return True
 
    @property
    def is_staff(self):
        "Is the user a member of staff?"
        # Simplest possible answer: All admins are staff
        return self.is_admin

如果您不熟悉自定义用户模型的工作原理,请务必查看 Django文档。

现在我们需要告诉 Django 使用这个 User 模型而不是默认的。将此行添加到quick_publisher/settings.py文件中:

AUTH_USER_MODEL ='main.User'

我们还需要将main应用程序添加到文件中的INSTALLED_APPS列表中quick_publisher/settings.py。

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'main',
]

我们现在可以创建迁移,应用它们,并创建一个超级用户以便能够登录到 Django 管理面板:

$ ./manage
.py makemigrations main 
$ ./manage
.py migrate 
$ ./manage.py createsuperuser

现在让我们创建一个单独的 Django 应用程序来负责发布:

$ ./manage.py startapp publish

让我们定义一个简单的 post 模型publish/models.py:

from django.db import models
from django.utils import timezone
from django.contrib.auth import get_user_model
 
 
class Post(models.Model):
    author = models.ForeignKey(get_user_model())
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')
 
    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

将模型与 Django 管理员挂钩Post是在publish/admin.py文件中完成的,如下所示:

from django.contrib import admin
from .models import Post
 
 
@admin.register(Post)
class PostAdmin(admin.ModelAdmin):
    pass

publish最后,让我们通过将应用程序添加到INSTALLED_APPS列表中来将应用程序与我们的项目挂钩。

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'main',
    'publish',
]

我们现在可以运行服务器并前往https://localhost:8000/admin/并创建我们的第一个帖子,以便我们可以玩一些东西:

$ ./manage.py runserver

我相信你已经完成了你的作业并且你已经创建了这些帖子。

让我们继续。下一个明显的步骤是创建一种查看已发布帖子的方法。

# publish/views.py
 
from django.http import Http404
from django.shortcuts import render
from .models import Post
 
 
def view_post(request, slug):
    try:
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")
 
    return render(request, 'post.html', context={'post': post})

让我们将新视图与以下 URL 相关联:quick_publish/urls.py

from django.contrib import admin
from django.urls import path,include
from publish.views import view_post
 
 
 
urlpatterns = [
    path('admin/', admin.site.urls),
    path('<slug:slug>',view_post,name='view_post'),
 
]

最后,让我们创建呈现帖子的模板:publish/templates/publish/post.html

<!DOCTYPE html>
<html>
<head lang="en">
    <meta charset="UTF-8">
    <title></title>
</head>
<body>
    <h1>{{ post.title }}</h1>
    <p>{{ post.content }}</p>
    <p>Published by {{ post.author.first_name }} on {{ post.created }}</p>
</body>
</html>

我们现在可以在浏览器中访问http://localhost:8000/the-slug-of-the-post-you-created /。

使用 Celery 和 Django 进行后台任务处理  第1张

这并不完全是网页设计的奇迹,但制作好看的帖子超出了本教程的范围。

发送确认电子邮件

这是经典场景:

  • 您在平台上创建一个帐户。

  • 您提供在平台上唯一标识的电子邮件地址

  • 该平台通过发送带有确认链接的电子邮件来检查您确实是电子邮件地址的所有者。

  • 在您执行验证之前,您无法(完全)使用该平台。

让我们在模型上添加一个is_verified标志和:verification_uuidUser

# main/models.py
import uuid
 
 
class MyUser(AbstractBaseUser):
    email = models.EmailField(verbose_name='email address',max_length=255,unique=True, )
    first_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
    last_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
    is_verified = models.BooleanField(verbose_name = 'verified', default=False) 
    is_active = models.BooleanField(default=True)
    is_admin = models.BooleanField(default=False)
    verification_uuid = models.UUIDField(verbose_name ='Unique Verification UUID', default=uuid.uuid4)

让我们利用这个机会将 User 模型添加到管理员:

from django.contrib import admin
from .models import User
 
 
@admin.register(User)
class UserAdmin(admin.ModelAdmin):
    pass

让我们将更改反映在数据库中:

$ ./manage.py makemigrations 
$ ./manage.py migrate

我们现在需要编写一段代码,在创建用户实例时发送电子邮件。这就是 Django 信号的用途,这是接触这个主题的绝佳机会。

在应用程序中发生某些事件之前/之后触发信号。我们可以定义触发信号时自动触发的回调函数。要进行回调触发,我们必须首先将其连接到信号。

我们将创建一个回调,该回调将在创建用户模型后触发。我们将在User模型定义之后添加此代码:main/models.py

from django.db.models import signals
from django.core.mail import send_mail
from django.urls import reverse
 
 
def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(instance.verification_uuid)}),
            'from@quickpublisher.dev',
            [instance.email],
            fail_silently=False,
        )
 
signals.post_save.connect(user_post_save, sender=User)

我们在这里所做的是我们定义了一个user_post_save函数并将其连接到post_save模型发送的信号(在保存模型后触发的信号)User。

Django 不只是自己发送电子邮件。它需要绑定到电子邮件服务。为简单起见,您可以在 中添加您的 gmail 凭据quick_publisher/settings.py,也可以添加您最喜欢的电子邮件提供商。

Gmail 配置如下所示:

EMAIL_USE_TLS = True
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER = '<YOUR_GMAIL_USERNAME>@gmail.com'
EMAIL_HOST_PASSWORD = '<YOUR_GMAIL_PASSWORD>'
EMAIL_PORT = 587

要进行测试,请进入管理面板并使用您可以快速检查的有效电子邮件地址创建一个新用户。如果一切顺利,您将收到一封包含验证链接的电子邮件。

验证例程尚未准备好。

以下是验证帐户的方法:

from django.shortcuts import render,redirect
from django.http import Http404
from .models import MyUser
 
# Create your views here.
 
  
def home(request):
    return render(request, 'main/home.html')
  
  
def verify(request, uuid):
    try:
        user = MyUser.objects.get(verification_uuid=uuid, is_verified=False)
    except MyUser.DoesNotExist:
        raise Http404("User does not exist or is already verified")
  
    user.is_verified = True
    user.save()
  
    return redirect('home')

将视图连接到:quick_publish/urls.py

# quick_publish/urls.py
 
from django.contrib import admin
from django.urls import path,include
 
from publish.views import view_post
from main.views import home,verify
 
 
 
urlpatterns = [
    path('admin/', admin.site.urls),
    path('',home, name = 'home'),
    path('<slug:slug>',view_post),
    path('verify/<uuid>',verify, name ='verify'),
 
]

另外,记得home.html在main/templates/main/home.html. 它将由home视图呈现。

尝试重新运行整个场景。如果一切顺利,您将收到一封包含有效验证 URL 的电子邮件。

如果您遵循 URL,然后在管理员中签入,您可以看到该帐户是如何通过验证的。

异步发送电子邮件

这是我们到目前为止所做的事情的问题。您可能已经注意到创建用户有点慢。那是因为 Django 在请求时间内发送了验证邮件。

它是这样工作的:我们将用户数据发送到 Django 应用程序。该应用程序创建一个User模型,然后创建与 Gmail(或您选择的其他服务)的连接。Django 等待响应,然后才向我们的浏览器返回响应。

这就是 Celery 的用武之地。首先,确保它已安装:

$ pip install Celery

我们现在需要在 Django 应用程序中创建一个 Celery 应用程序:

# quick_publish/celery.py
 
import os
from celery import Celery
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
 
app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')
 
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Celery 是一个任务队列。它从我们的 Django 应用程序接收任务,并将在后台运行它们。Celery 需要与充当代理的其他服务配对。

代理在 Web 应用程序和 Celery 之间发送消息。在本教程中,我们将使用 redis。Redis 易于安装,我们可以轻松上手,无需大惊小怪。

您可以按照 Redis 快速入门页面上的说明安装 Redis 。您需要安装 Redis Python 库,pip install redis以及使用 Redis 和 Celery 所需的包:pip install celery[redis].

在单独的控制台中启动 Redis 服务器,如下所示:$ redis-server

让我们将 Celery/Redis 相关的配置添加到quick_publisher/settings.py:

# REDIS related settings 
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} 
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

在 Celery 中运行任何东西之前,必须将其声明为任务。

以下是如何执行此操作:

# main/tasks.py
 
import logging
 
from django.urls import reverse
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app
 
 
@app.task
def send_verification_email(user_id):
    UserModel = get_user_model()
    try:
        user = UserModel.objects.get(pk=user_id)
        send_mail(
            'Verify your QuickPublisher account',
            'Follow this link to verify your account: '
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}),
            'from@quickpublisher.dev',
            [user.email],
            fail_silently=False,
        )
    except UserModel.DoesNotExist:
        logging.warning("Tried to send verification email to non-existing user '%s'" % user_id)

我们在这里所做的是:我们将发送验证电子邮件功能移到另一个名为tasks.py.

几点注意事项:

  • 文件名很重要。Celery 遍历所有应用程序INSTALLED_APPS并将任务注册到tasks.py文件中。

  • 注意我们是如何send_verification_email用@app.task. 这告诉 Celery 这是一个将在任务队列中运行的任务。

  • 请注意我们如何期望作为参数user_id而不是User对象。这是因为在将任务发送到 Celery 时,我们可能无法序列化复杂的对象。最好让它们保持简单。

回到main/models.py,导入send_verification_email函数并在创建新用户后运行它。信号代码变为:

from django.db.models import signals
from main.tasks import send_verification_email
 
 
def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_verification_email.delay(instance.pk)
 
signals.post_save.connect(user_post_save, sender=User)

注意我们如何调用.delay任务对象的方法。这意味着我们将任务发送到 Celery 并且我们不等待结果。如果我们send_verification_email(instance.pk)改用它,我们仍然会将它发送到 Celery,但会等待任务完成,这不是我们想要的。

在开始创建新用户之前,有一个问题。Celery 是一项服务,我们需要启动它。打开一个新控制台,确保激活了相应的virtualenv,然后导航到项目文件夹。

$ celery -A quick_publisher.celery worker --loglevel=debug --concurrency=4

这将启动四个 Celery 进程工作人员。您应该看到如下内容:

[2022-09-23 14:56:17,565: INFO /MainProcess] celery@vaati-Yoga-9-14ITL5 ready.

是的,现在你终于可以去创建另一个用户了。注意没有延迟,并确保查看 Celery 控制台中的日志并查看任务是否正确执行。这应该看起来像这样:

[2022-09-23 14:58:38,165: INFO/MainProcess] Task main.tasks.send_verification_email[4f8f8455-3a61-48d2-b02f-ad6786b362e1] received
[2022-09-23 14:58:42,228: INFO/ForkPoolWorker-4] Task main.tasks.send_verification_email[4f8f8455-3a61-48d2-b02f-ad6786b362e1] succeeded in 4.0618907359967125s: None

芹菜的定期任务

这是另一个常见的场景。大多数成熟的 Web 应用程序都会向其用户发送生命周期电子邮件,以保持他们的参与度。生命周期电子邮件的一些常见示例:

  • 月度报告

  • 活动通知(点赞、好友请求等)

  • 完成某些操作的提醒(“不要忘记激活您的帐户”)

这是我们将在我们的应用程序中执行的操作。我们将计算每个帖子被查看的次数,并将每日报告发送给作者。每天一次,我们将遍历所有用户,获取他们的帖子,并发送一封电子邮件,其中包含一个包含帖子和查看次数的表格。

让我们更改Post模型,以便我们可以适应视图计数场景。

class Post(models.Model):
    author = models.ForeignKey(get_user_model(),on_delete= models.CASCADE)
    created = models.DateTimeField('Created Date', default=timezone.now)
    title = models.CharField('Title', max_length=200)
    content = models.TextField('Content')
    slug = models.SlugField('Slug')
    view_count = models.IntegerField("View Count", default=0)
 
    def get_absolute_url(self):
        return reverse("home", args=[str(self.id)])
  
    def __str__(self):
        return '"%s" by %s' % (self.title, self.author)

与往常一样,当我们更改模型时,我们需要迁移数据库:

$ ./manage.py makemigrations 
 
$ ./manage.py migrate

让我们也修改view_postDjango 视图以计算视图:

def view_post(request, slug):
    try:
        post = Post.objects.get(slug=slug)
    except Post.DoesNotExist:
        raise Http404("Poll does not exist")
     
    post.view_count += 1
    post.save()
 
    return render(request, 'publish/post.html', context={'post': post})

view_count在模板中显示 会很有用。<p>Viewed {{ post.view_count }} times</p>在publisher/templates/post.html文件中的某处添加它。现在对帖子进行一些查看,看看计数器如何增加。

使用 Celery 和 Django 进行后台任务处理  第2张

让我们创建一个 Celery 任务。由于它是关于帖子的,我将把它放在publish/tasks.py:

from django.template import Template, Context
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from quick_publisher.celery import app
from publish.models import Post
 
 
REPORT_TEMPLATE = """
Here's how you did till now:
 
{% for post in posts %}
        "{{ post.title }}": viewed {{ post.view_count }} times |
 
{% endfor %}
"""
 
 
@app.task
def send_view_count_report():
    for user in get_user_model().objects.all():
        posts = Post.objects.filter(author=user)
        if not posts:
            continue
 
        template = Template(REPORT_TEMPLATE)
 
        send_mail(
            'Your QuickPublisher Activity',
            template.render(context=Context({'posts': posts})),
            'from@quickpublisher.dev',
            [user.email],
            fail_silently=False,
        )

每次更改 Celery 任务时,请记住重新启动 Celery 进程。Celery 需要发现和重新加载任务。在创建周期性任务之前,我们应该在 Django shell 中对其进行测试,以确保一切都按预期工作:

$ ./manage.py shell 
 
In [1]: from publish.tasks import send_view_count_report 
 
In [2]: send_view_count_report.delay()

希望您在电子邮件中收到一份漂亮的小报告。

现在让我们创建一个周期性任务。打开quick_publisher/celery.py并注册周期性任务:

# quick_publisher/celery.py
 
import os
from celery import Celery
from celery.schedules import crontab
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
 
app = Celery('quick_publisher')
app.config_from_object('django.conf:settings')
 
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
 
app.conf.beat_schedule = {
    'send-report-every-single-minute': {
        'task': 'publish.tasks.send_view_count_report',
        'schedule': crontab(),  # change to `crontab(minute=0, hour=0)` if you want it to run daily at midnight
    },
}

到目前为止,我们创建了一个计划,该计划将publish.tasks.send_view_count_report按照符号指示每分钟运行一次任务crontab()。您还可以指定各种Celery Crontab 时间表。

打开另一个控制台,激活相应的环境,然后启动 Celery Beat 服务。

$ celery -A quick_publisher beat

Beat 服务的工作是按计划推送 Celery 中的任务。考虑到计划使send_view_count_report任务根据设置每分钟运行一次。它适用于测试,但不推荐用于真实的 Web 应用程序。

使用 Celery 和 Django 进行后台任务处理  第3张

5.让任务更可靠

任务通常用于执行不可靠的操作、依赖外部资源或由于各种原因而容易失败的操作。以下是使它们更可靠的指南:

  • 使任务具有幂等性。幂等任务是如果中途停止,不会以任何方式改变系统状态的任务。该任务要么对系统进行完全更改,要么根本不更改。

  • 重试任务。如果任务失败,最好一次又一次地尝试,直到它成功执行。您可以使用Celery Retry在 Celery 中执行此操作。另一件有趣的事情是指数退避算法。当考虑限制重试任务对服务器的不必要负载时,这可能会派上用场。

结论

我希望这对你来说是一个有趣的教程,也是一个很好的介绍如何在 Django 中使用 Celery。

以下是我们可以得出的一些结论:

  • 将不可靠且耗时的任务保留在请求时间之外是一种很好的做法。

  • 长时间运行的任务应该由工作进程(或其他范例)在后台执行。

  • 后台任务可用于对应用程序的基本功能不重要的各种任务。

  • Celery 还可以使用该celery beat服务处理周期性任务。

  • 如果使任务具有幂等性并重试(可能使用指数退避),则任务会更可靠。


文章目录
  • 设置东西
  • 发送确认电子邮件
  • 异步发送电子邮件
  • 芹菜的定期任务
  • 5.让任务更可靠
  • 结论
  • 发表评论