[解決済み] Flaskで非同期タスクを作る
2022-07-16 06:49:48
質問
私はFlaskでアプリケーションを書いており、以下の点を除いてはとてもうまく動作しています。
WSGI
が同期とブロックであることを除いては、とてもうまく動作しています。私はサードパーティのAPIを呼び出す1つのタスクを持っており、そのタスクが完了するまでに数分かかることがあります。私はその呼び出し(実際には一連の呼び出しです)を行い、制御がFlaskに返される間、それを実行させたいと考えています。
私のビューは次のようになります。
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
# do stuff
return Response(
mimetype='application/json',
status=200
)
さて、私がやりたいことは、行の
final_file = audio_class.render_audio()
を実行し、メソッドが戻ったときに実行されるコールバックを提供し、その間Flaskはリクエストの処理を続けることができます。これは、Flaskが非同期で実行する必要がある唯一のタスクであり、私はこれを実装するための最善の方法についてのいくつかのアドバイスをしたいと思います。
私はTwistedとKleinを見ましたが、多分Threadingが十分であるように、私はそれらが過剰であることを確信していません。あるいは、Celeryはこのための良い選択でしょうか?
どのように解決するのですか?
私なら セロリ を使って、非同期タスクを処理することができます。タスクキューとなるブローカーをインストールする必要があります(RabbitMQやRedisがおすすめです)。
app.py
:
from flask import Flask
from celery import Celery
broker_url = 'amqp://guest@localhost' # Broker URL for RabbitMQ task queue
app = Flask(__name__)
celery = Celery(app.name, broker=broker_url)
celery.config_from_object('celeryconfig') # Your celery configurations in a celeryconfig.py
@celery.task(bind=True)
def some_long_task(self, x, y):
# Do some long task
...
@app.route('/render/<id>', methods=['POST'])
def render_script(id=None):
...
data = json.loads(request.data)
text_list = data.get('text_list')
final_file = audio_class.render_audio(data=text_list)
some_long_task.delay(x, y) # Call your async task and pass whatever necessary variables
return Response(
mimetype='application/json',
status=200
)
Flaskアプリを実行し、celery workerを実行するために別プロセスを起動します。
$ celery worker -A app.celery --loglevel=debug
また、Miguel Gringbergの 書き込み を参照してください。
関連
-
[解決済み] どうすればjQueryに非同期ではなく、同期のAjaxリクエストを実行させることができますか?
-
[解決済み] Flaskのリクエストで受信したデータを取得する
-
[解決済み] 関数内で変数を変更した後、変数が変更されないのはなぜですか?- 非同期コードリファレンス
-
[解決済み] Flaskで静的ファイルを提供する方法
-
[解決済み] FlaskのビューからJSONレスポンスを返す
-
[解決済み] Flaskの開発サーバーをネットワーク上で見えるように設定する
-
[解決済み] Flaskのルートでクエリ文字列にアクセスする方法は?
-
[解決済み] PythonでSVGからPNGに変換する
-
[解決済み] if文によるリスト内包
-
[解決済み] Pythonでnumpy.linalg.eigを使用した後の固有値と関連する固有ベクトルのソート
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[解決済み] Pythonのマルチプロセッシングプールimap_unorderedの呼び出しの進捗を表示しますか?
-
[解決済み] なぜ(0-6)は-6=偽なのか?重複
-
[解決済み] python-requests モジュールからのすべてのリクエストをログに記録します。
-
[解決済み] Pythonでマルチプロセッシングキューを使うには?
-
[解決済み] if 節の終了方法
-
[解決済み] pyvenv-3.4 は 0 ではない終了ステータス 1 を返しました。
-
[解決済み] テンプレートファイル変更時にFlaskアプリを再読み込みする
-
[解決済み] Python で、クラスオブジェクトを dict にキャストするにはどうしたらいいですか?
-
[解決済み] Google App EngineのためのFlaskとwebapp2の比較
-
[解決済み] Pythonで文字列が数字で始まるかどうかを判断するには?