[解決済み] Airflowで条件付きタスクを作成する方法
2022-02-17 14:45:20
質問
Airflowで下記のスキーマのような条件付きタスクを作りたいのですが、どうすればいいですか?想定されるシナリオは以下の通りです。
- タスク1が実行される
- タスク1が成功したら、タスク2aを実行する。
- タスク1が失敗した場合、タスク2bを実行します。
- 最後にタスク3を実行する
上記のタスクは全てSSHExecuteOperatorです。 ShortCircuitOperatorと/またはXComを使って条件を管理する必要があると思うのですが、どのように実装すればよいのかがよくわかりません。解決策を説明していただけませんか?
解決方法を教えてください。
を使用する必要があります。 エアフロールトリガールール
すべてのオペレータは、生成されたタスクがトリガーされるルールを定義するtrigger_rule引数を持っています。
トリガールールの可能性。
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
ここで、あなたの問題を解決するためのアイデアを紹介します。
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
関連
-
[解決済み】numpy: true_divide で無効な値に遭遇
-
[解決済み】終了コード -1073741515 (0xC0000135)でプロセス終了)
-
[解決済み】Python - "ValueError: not enough values to unpack (expected 2, got 1)" の修正方法 [閉店].
-
[解決済み] プログラムの実行やシステムコマンドの呼び出しはどのように行うのですか?
-
[解決済み] リストのリストからフラットなリストを作るには?
-
[解決済み] 辞書を値で並べ替えるにはどうしたらいいですか?
-
[解決済み] Rubyのswitch文の書き方
-
[解決済み】ネストされたディレクトリを安全に作成するには?
-
[解決済み】Pythonに三項条件演算子はありますか?
-
[解決済み】2つの辞書を1つの式でマージする(辞書の和をとる)には?)
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
ピロウズ画像色処理の具体的な活用方法
-
python call matlab メソッドの詳細
-
Python百行で韓服サークルの画像クロールを実現する
-
Pythonの学習とデータマイニングのために知っておくべきターミナルコマンドのトップ10
-
[解決済み】TypeError: unhashable type: 'numpy.ndarray'.
-
[解決済み] _tkinter.TclError: 表示名がなく、$DISPLAY環境変数もない。
-
[解決済み] 'int'オブジェクトに'__getitem__'属性がない。
-
[解決済み】LogisticRegression: Pythonでsklearnを使用して、未知のラベルタイプ: '連続'を使用しています。
-
[解決済み】NameError: 名前 'self' が定義されていません。
-
[解決済み】cアンダースコア式`c_`は、具体的に何をするのですか?