エアフロー実行期間設定 schedule_interval
2022-02-16 03:46:14
エアフロー実行時の問題
最近airflowを正式に使い始めたのですが、schedule_intervalとページに表示されるlast runについて混乱があり、毎週実行するタスクを設定したところ、タスクがスケジュール通りに実行されないという問題についに遭遇してしまいました。
ググりまくった結果、airflowのschedule_intervalはcron式を使えることがわかりましたが、crontabとはまだ違いがあるようです。
バックフィルについて
backfillコマンドは、データのバックフィル(前の日付でタスクを実行すること)を行うために使用します。
タスクを毎日実行する場合は、開始日を追加するだけです。
airflow backfill CKD_ALL_REPORT -s 2018-09-04
しかし、このタスクが複数日に1回実行される場合はうまくいかず、次のようなプロンプトが表示されます。
No run dates were found for the given dates and dag interval.
これは、気流が窓という概念を持っているためです
エアフローは、実行日ではなく、カバーするスケジュール期間の左端に基づいて実行日を設定します(これは期間の右端となります)。
stackoverflowで見つけたより合理的な説明は、airflowがschedule_intervalの定義を満たすstart_dateの後の最初の時点を実行日として記録するが、次の時点が来てから実行を開始するということである。つまり、このウィンドウによって、最後の実行は1サイクル遅れることになる。
そこで、jinjaでexecution_dateを確認する方法で、問題が明らかになります
Jinjaテンプレート
<テーブル JINJA式 意味 { <未定義 {ds }}。 実行日(YYYY-MM-DD)。 { <未定義 {ds_nodash }}。 実行日(YYYYMMDD) { <未定義 { yesterday_ds }} 昨日の日付(YYYY-MM-DD) { <未定義 { yesterday_ds_nodash }}。 昨日の日付(YYYYMMDD) { <未定義 { tomorrow_ds }} 明日の日付(YYYY-MM-DD)。 { <未定義 { tomorrow_ds_nodash }}。 明日の日付(YYYYMMDD) { <未定義 {ts }}。 execution_date.isoformat() と同じです。 { <未定義 {ts_nodash }}。 tsから-と.を除いたものと同じです。 { <未定義 {実行日 }} 実行日、(datetime.datetime) { <未定義 { prev_execution_date }} です。 前回の実行日(あれば)(datetime.datetime) { <未定義 { next_execution_date }}となります。 次の実行日(datetime.datetime) { <未定義 { dag }} DAGオブジェクト { <未定義 {タスク }} タスクオブジェクト { <未定義 {マクロ }} 以下に説明する macros パッケージへの参照。 { <未定義 {task_instance }} タスクインスタンスオブジェクト { <未定義 {end_date }}となります。 と同じです。 <未定義 {ds }}。 { <未定義 { latest_date }}となります。 と同じです。 <未定義 {ds }}。 { <未定義 {ti }} と同じです。 <未定義 {task_instance }}です。 { <未定義 {params }} ユーザー定義パラメータ辞書への参照 { <未定義 { var.value.my_var }}となります。 辞書として表現されるグローバル定義変数 { <未定義 { var.json.my_var.path }} となります。 JSONオブジェクトをデシリアライズした辞書として表現されたグローバル定義変数は、JSONオブジェクト内のキーにパスを追加します。 { <未定義 { task_instance_key_str }}となります。 タスクインスタンスへのユニークで人間が読めるキー フォーマット {dag_id} です。 {task_id} {ds} コンフ を表す airflow.configuration.conf にある完全な設定オブジェクト。 ランID 現在のDAGランのrun_id dag_run DagRunオブジェクトへの参照 test_mode タスクインスタンスがCLIのtestサブコマンドを使用して呼び出されたかどうか関連
-
[解決済み】tar: OSXのTFチュートリアル、flower_photos.tgzを解凍しようとするとUnrecognized archive format errorが発生する。
-
[解決済み] ValueError : "Can only tuple-index with a MultiIndex " を取得する。
-
[解決済み】Pandasでデータ(.datファイル)を読み込む
-
ImportError: 必要な依存関係 ['numpy'] がない 解決方法
-
音声認識を短期間でマスターするためのPythonのナレッジベース
-
[解決済み] Node.jsの依存性のためにWindowsでPythonを実行する
-
[解決済み] 空の配列に対するValueErrorを克服する
-
[解決済み] このFlaskのコードにあるgオブジェクトは何ですか?
-
Python read/write file Chinese mess Error TypeError: write() argument must be str, not bytes+.
-
pythonはgoまたはcを呼び出す
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
[Python] TypeError: ハッシュ化できない型: 'numpy.ndarray'
-
[解決済み] 非順序に対するPythonの反復処理
-
[解決済み] asyncioを学習中。「コルーチンが待機していない」警告エラー
-
[解決済み] AttributeError: モジュール 'pandas' には 'computation' という属性がありません。
-
[解決済み] Pythonの「範囲」を使った2桁の数え方
-
[解決済み] python3.2用pycompile。
-
[解決済み] 1つのファイルからランダムな行を読み取るにはどうしたらいいですか?
-
[解決済み] PyTorchでテンソルの値を取得するにはどうしたらいいですか?
-
[解決済み] この単純なベンチマークで、なぜSQLiteはRedisより速いのか?
-
Understanding TypeError: unsupported operand type(s) for ^: Python の 'float' と 'int' (単純)