1. ホーム
  2. airflow

airflow の execution_date: 変数としてアクセスする必要がある。

2023-10-02 13:31:02

質問

私はこのフォーラムでは本当に初心者です。しかし、私はいつからか、私たちの会社のために、気流で遊んでいます。 この質問が本当に馬鹿に聞こえたらすみません。

私はBashOperatorsの束を使用してパイプラインを書いています。 基本的に、各タスクについて、私は単に'curl'を使用してREST apiを呼び出したいと思っています。

私のパイプラインはこんな感じです(非常に単純化されたバージョンです)。

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

気がつけば、私は current_datetime= datetime_obj.now(tz=tz.tzlocal()) 代わりに、私がここで欲しいのは '実行日'

どのようにすれば '実行日' を直接使用して、それを私の python ファイルの変数に代入するにはどうすればよいですか?

私は、引数へのアクセスに関するこの一般的な問題を抱えています。 どんな助けでも純粋に感謝されます。

ありがとうございます。

どのように解決するのですか?

この BashOperator 's bash_command テンプレート . にアクセスすることができます。 execution_date を任意のテンプレートで datetime オブジェクト を使用して execution_date 変数を使っています。テンプレート内では、任意の jinja2 メソッドを使って操作することができます。

として以下を使用します。 BashOperator bash_command 文字列 :

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

実行日相当の文字列が欲しいだけの場合。 ds は日付スタンプ(YYYY-MM-DD)を返します。 ds_nodash は同じものをダッシュなしで返す (YYYYMMDD)、など。 詳しくは macros API ドキュメント .


最終的な演算子は次のようになります。

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)