[解決済み] エアフロー on_failure_callback
2022-02-26 21:59:50
質問
こんにちは、皆様お元気でしょうか。 最近、Airlfowを試して遊んでいるのですが、以下のような状況です。
- 読み込み_csv
- 処理ファイル
正常に動作する わざわざ pandas Datframeでtypoを作成し、失敗時のコールバックがどのように機能するかを学び、それがトリガーされているかどうかを確認するために、ログからdosentが好きなようです。
''' トレースバック (最も最近の呼び出し): ファイル "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure タスク.on_failure_callback(コンテキスト) TypeError: on_failure_callback() は 0 個の位置引数を取りますが、1 個が与えられました。 '''
以下はコードです。
try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pandas as pd
# Setting up Triggers
from airflow.utils.trigger_rule import TriggerRule
# for Getting Variables from airlfow
from airflow.models import Variable
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def read_csv(**context):
data = [{"name":"Soumil","title":"Full Stack Software Engineer"}, { "name":"Nitin","title":"Full Stack Software Engineer"},]
df = pd.DataFramee(data=data)
dag_config = Variable.get("VAR1")
print("VAR 1 is : {} ".format(dag_config))
context['ti'].xcom_push(key='mykey', value=df)
def process_file(**context):
instance = context.get("ti").xcom_pull(key='mykey')
print(instance.head(2))
return "Process complete "
def on_failure_callback(**context):
print("Fail works ! ")
with DAG(dag_id="invoices_dag",
schedule_interval="@once",
default_args={
"owner": "airflow",
"start_date": datetime(2020, 11, 1),
"retries": 1,
"retry_delay": timedelta(minutes=1),
'on_failure_callback': on_failure_callback,
},
catchup=False) as dag:
read_csv = PythonOperator(
task_id="read_csv",
python_callable=read_csv,
op_kwargs={'filename': "Soumil.csv"},
provide_context=True
)
process_file = PythonOperator(
task_id="process_file",
python_callable=process_file,
provide_context=True
)
read_csv >> process_file
# ====================================Notes====================================
# all_success -> triggers when all tasks arecomplete
# one_success -> trigger when one task is complete
# all_done -> Trigger when all Tasks are Done
# all_failed -> Trigger when all task Failed
# one_failed -> one task is failed
# none_failed -> No Task Failed
# ==============================================================================
# ============================== Executor====================================
# There are Three main types of executor
# -> Sequential Executor run single task in linear fashion wih no parllelism default Dev
# -> Local Exector run each task in seperate process
# -> Celery Executor Run each worker node within multi node architecture Most scalable
# ===========================================================================
解決方法は?
Airflow がどのようにトリガーするかによって、コンテキストを受け取ることができる関数に1つの引数を指定する必要があります。 オン_失敗_コールバック
def on_failure_callback(context):
print("Fail works ! ")
この実装では、メッセージからどのタスクが失敗したのかが分からないので、エラーメッセージにタスクの詳細を追加するとよいでしょう。
def on_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id } ")
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン