1. ホーム
  2. airflow

[解決済み] Airflowでタスクをスキップするには?

2022-02-15 04:51:18

質問

Airflowがアドホックな実行のためにDAG内のタスクのスキップをサポートしているかどうかを理解しようとしているのですが?

例えば、私のDAGのグラフが次のようなものだとします。 タスク1 > タスク2 > タスク3 > タスク4

そして、タスク3から手動でDAGを開始したいのですが、どのような方法が良いでしょうか?

について読んだことがあります。 ShortCircuitOperator しかし、私は、実行がトリガーされたときに適用できる、よりアドホックな解決策を探しています。

ありがとうございます。

解決方法は?

を組み込むことができます。 スキップミキシン ShortCircuitOperatorが はボンネットの中で使用します。 を使用して下流のタスクをスキップします。

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")