[解決済み] Celery - 現在のタスクのタスクIDを取得する
2023-08-18 06:19:38
質問
タスクの中からtask_idの値を取得するにはどうしたらよいでしょうか。以下は私のコードです。
from celery.decorators import task
from django.core.cache import cache
@task
def do_job(path):
"Performs an operation on a file"
# ... Code to perform the operation ...
cache.set(current_task_id, operation_results)
このアイデアは、タスクの新しいインスタンスを作るときに、そのインスタンスから
task_id
をタスクオブジェクトから取得します。そして、タスクが完了したかどうかを判断するために、タスクIDを使用します。I
はしません。
によってタスクを追跡したいのです。
path
の値によってタスクを追跡することを望みません。なぜなら、ファイルはタスクが完了した後に "clean up" され、存在するかもしれないし、存在しないかもしれないからです。
上記の例では、どのようにして
current_task_id
?
どのように解決するのですか?
Celeryはタスクが受け入れる場合、いくつかのデフォルトのキーワード引数を設定します。 (**kwargs を使用するか、それらを具体的にリストすることによって、それらを受け入れることができます)
@task
def do_job(path, task_id=None):
cache.set(task_id, operation_results)
デフォルトのキーワード引数の一覧は、こちらに記載されています。 http://ask.github.com/celery/userguide/tasks.html#default-keyword-arguments
関連
-
[解決済み] for'ループでインデックスにアクセスする?
-
[解決済み] Pythonで現在時刻を取得する方法
-
[解決済み] __init__.py は何のためにあるのですか?
-
[解決済み] パラメータに**(ダブルスター/アスタリスク)、*(スター/アスタリスク)がありますが、これはどういう意味ですか?
-
[解決済み] リストの最後の要素を取得する方法
-
[解決済み] Pythonで文字列の部分文字列を取得するにはどうすればよいですか?
-
[解決済み] 億の相対的輸入
-
[解決済み] モジュールのパスを取得する方法は?
-
[解決済み】forループを使った辞書の反復処理
-
[解決済み] googletransがエラー 'NoneType' オブジェクトに 'group' 属性がない、と言って動かなくなった。
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] PythonでSVGからPNGに変換する
-
[解決済み] Djangoで2つの日付の間を選択する
-
[解決済み] dict を txt ファイルに書き、それを読み取る?
-
[解決済み] 文字列から先頭と末尾のスペースを削除するには?
-
[解決済み] tensorflowのCPUのみのインストールでダイナミックライブラリ 'cudart64_101.dll' を読み込めなかった
-
[解決済み] Pythonによる一対のクロスプロダクト [重複] (英語)
-
[解決済み] Python Empty Generator 関数
-
[解決済み] 単純な文字列からtimedeltaオブジェクトを作成する方法
-
[解決済み] Pythonでリストが空かどうかをチェックする方法は?重複
-
[解決済み] Alembicアップグレードスクリプトでインサートやアップデートを実行するにはどうすればよいですか?