1. ホーム
  2. python

[解決済み] Celeryタスクのユニットテストはどのように行うのですか?

2022-07-19 20:57:49

質問

Celeryのドキュメント は Django 内で Celery をテストすることに言及しています。 が、Django を使っていない場合に Celery タスクをテストする方法は説明されていません。どのようにするのでしょうか?

どのように解決するのですか?

どのunittestライブラリでも、タスクを同期的にテストすることは可能です。私は通常、celery のタスクで作業するとき、2 つの異なるテスト セッションを行います。最初のセッションは(私がここで提案しているように)完全に同期的であり、アルゴリズムがすべきことを行うことを確認するものであるべきです。2番目のセッションはシステム全体(ブローカーを含む)を使用し、シリアライズの問題やその他の配布、通信の問題がないことを確認します。

というわけで。

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

そしてあなたのテスト。

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

お役に立てれば幸いです。