1. ホーム
  2. airflow

[解決済み] 気流埋め戻し明確化

2022-02-14 02:26:40

質問事項

これからAirbnbの エアフロー バックフィルがいつどのように行われるのか、まだよく分かっていません。

具体的には、2つのユースケースがあり、混乱しています。

  1. もし私が airflow scheduler を数分間実行し、1分間停止した後、再び再起動すると、私のDAGは最初の30秒ほど余分なタスクを実行し、その後は通常通り(10秒ごとに実行)実行されるようです。 これらの余分なタスクは、以前の実行で完了できなかったタスクの埋め戻しなのでしょうか? もしそうなら、これらのタスクをバックフィルしないように airflow にどのように指示すればよいのでしょうか?

  2. もし私が airflow scheduler を数分間実行し、その後 airflow clear MY_tutorial を起動し、その後 airflow scheduler を実行すると、余分なタスクが大量に実行されるようです。 これらのタスクは、何らかの形で "backfilled" タスクでもあるのでしょうか? それとも、私は何かを見逃しているのでしょうか?

現在、私は非常にシンプルなダグを持っています。

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2016, 10, 4),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 8)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

second_template = """
    touch ~/airflow/logs/test
    echo $(date) >> ~/airflow/logs/test
"""

t4 = BashOperator(
    task_id='write_test',
    bash_command=second_template,
    dag=dag)

t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)

エアフローの設定で変更したのは、以下の2点だけです。

  1. sqlite dbからpostgres dbに変更しました。
  2. を使っています。 CeleryExecutor の代わりに SequentialExecutor

本当にありがとうございました。

解決方法は?

DAGのスケジューラーのトグルを"on"に変更すると、スケジューラーは、"default_args"で指定したstart_dateから、状態を記録していないすべてのdag runインスタンスのバックフィルをトリガーします。

例えば 開始日が "2017-01-21" で、スケジューリングのトグルを "2017-01-22T00:00:00" でオンにした場合、ダグが1時間ごとに実行するように設定されていると、スケジューラーは24のダグランをバックフィルして、スケジュールされた間隔で実行を開始します。

これは、本質的には、両方の質問で起こっていることです。1では、スケジューラーをオフにした30秒間のうち、3回分の不足分を補っています。2では、start_dateから"now"までのすべてのDAGランを埋め込んでいます。

これを回避する方法は2つあります。

  1. start_dateを将来の日付に設定し、その日付に達したときにのみdagランのスケジューリングを開始するようにします。DAGのstart_dateを変更する場合、開始日がairflowのDBに保存される方法のため、DAGの名前も変更する必要があることに注意してください。
  2. 手動でバックフィルを実行する コマンドラインから このフラグは、airflow に実際に DAG を実行するのではなく、DB で成功したものとしてマークするように指示します。

airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30