[解決済み] 気流埋め戻し明確化
質問事項
これからAirbnbの エアフロー バックフィルがいつどのように行われるのか、まだよく分かっていません。
具体的には、2つのユースケースがあり、混乱しています。
-
もし私が
airflow scheduler
を数分間実行し、1分間停止した後、再び再起動すると、私のDAGは最初の30秒ほど余分なタスクを実行し、その後は通常通り(10秒ごとに実行)実行されるようです。 これらの余分なタスクは、以前の実行で完了できなかったタスクの埋め戻しなのでしょうか? もしそうなら、これらのタスクをバックフィルしないように airflow にどのように指示すればよいのでしょうか? -
もし私が
airflow scheduler
を数分間実行し、その後airflow clear MY_tutorial
を起動し、その後airflow scheduler
を実行すると、余分なタスクが大量に実行されるようです。 これらのタスクは、何らかの形で "backfilled" タスクでもあるのでしょうか? それとも、私は何かを見逃しているのでしょうか?
現在、私は非常にシンプルなダグを持っています。
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
エアフローの設定で変更したのは、以下の2点だけです。
- sqlite dbからpostgres dbに変更しました。
-
を使っています。
CeleryExecutor
の代わりにSequentialExecutor
本当にありがとうございました。
解決方法は?
DAGのスケジューラーのトグルを"on"に変更すると、スケジューラーは、"default_args"で指定したstart_dateから、状態を記録していないすべてのdag runインスタンスのバックフィルをトリガーします。
例えば 開始日が "2017-01-21" で、スケジューリングのトグルを "2017-01-22T00:00:00" でオンにした場合、ダグが1時間ごとに実行するように設定されていると、スケジューラーは24のダグランをバックフィルして、スケジュールされた間隔で実行を開始します。
これは、本質的には、両方の質問で起こっていることです。1では、スケジューラーをオフにした30秒間のうち、3回分の不足分を補っています。2では、start_dateから"now"までのすべてのDAGランを埋め込んでいます。
これを回避する方法は2つあります。
- start_dateを将来の日付に設定し、その日付に達したときにのみdagランのスケジューリングを開始するようにします。DAGのstart_dateを変更する場合、開始日がairflowのDBに保存される方法のため、DAGの名前も変更する必要があることに注意してください。
- 手動でバックフィルを実行する コマンドラインから このフラグは、airflow に実際に DAG を実行するのではなく、DB で成功したものとしてマークするように指示します。
例
airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30
関連
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン