気流を利用したオペレーターの紹介
メインコンテンツ
空気の流れを作ることが目的であり、そのためには今回紹介する様々なOperatorがなければ利用できません
1. 演算子の紹介
オペレータは、インスタンス化されるとDAGのタスク・ノードとなる特定のタイプのタスクを生成することができる。すべてのオペレーターは BaseOperator から派生し、多くのプロパティとメソッドをこの方法で継承します。
Operatorには、大きく分けて3つのタイプがあります。
- リモートマシン上で操作を実行したり、操作を実行したりします。
- あるシステムから別のシステムへデータを移動させる
- センサーと同様に、特定の条件が満たされるまで実行し続ける特定のタイプのオペレータです。例えば、HDFSやS3に特定のファイルが到着するのを待つ、Hiveに特定のパーティションが現れるのを待つ、特定の時間帯に実行する、などです。
2. BaseOperatorの紹介
すべてのOperatorはBaseOperatorから派生し、継承することでより多くの機能を持つようになります。これはエンジンの中核となるものです。したがって、Operatorの基本機能を理解するためには、BaseOperatorのパラメータを理解することに時間を費やすことが重要です。
まず、コンストラクタのプロトタイプを見てみましょう。
class airflow.models.BaseOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_ delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_ interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1 , weight_rule=u'downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback= None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None , inlets=None, outlets=None, *args, **kwargs)
ここにはいくつものパラメータがありますが、文字通り大筋で理解できるものもあり、詳しい説明は公式ドキュメントをご覧いただければと思いますので、ここでは割愛します。 schedule_intervalの近くです。例えば、日次実行の開始日を「2018-09-21 00:00:00」、時間単位実行を「2018-09-21 05:00:00」と設定すると、airflowは開始日_dateにschedule_intervalを加えたものを実行日としてセットします。注意すべきは、タスクの依存関係を適時に解消する必要があることです。例えば、タスクAがタスクBに依存していても、start_dateが異なるために実行日が異なれば、タスクAの依存関係は決して満たされることはない。毎日午後2時に開始するなど、毎日タスクを実行する必要がある場合、DAG内のcron式で
schedule_interval="0 14 * * *"
すべてのOperatorはBaseOperatorを継承しているので、BaseOperatorの引数は他のOperatorの引数にもなります。
3. BashOperator
DAGの公式サンプルチュートリアルは、典型的なBashOperatorで、bashコマンドやスクリプトを呼び出し、テンプレート引数をチュートリアルに渡します。
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date', # This can also be a bash script file
bash_command='date',
dag=dag)
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
! [img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
dag.doc_md = __doc__
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
ここでt1とt2はわかりやすく、bashコマンドを直接呼び出しています。実際にはbashスクリプトをパスで渡すこともできます。t3はJinjaのテンプレートを使っています。 "{% %}" 内部ではforタグを使ってループ処理を行います。 <未定義 { }}" 中身は変数で、dsは実行日、airflowのマクロ変数、params.my_paramはカスタム変数です。提供されている公式のテンプレートを元に少し修正すれば、日々の業務に対応できます。
4. PythonOperator
PythonOperatorはPythonの関数を呼び出すことができます。Pythonは基本的にどんなタイプのタスクでも呼び出すことができるので、適当なOperatorが見つからない場合は、タスクをPython関数に変換してPythonOperatorを使用することも選択肢のひとつになります。以下は、公式ドキュメントに掲載されているPythonOperatorの使用例です。
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import time
from pprint import pprint
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
task = PythonOperator(
task_id='sleeping_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag)
task.set_upstream(run_this)
PythonOperatorは基本的にBashOperatorと似ていますが、python_callableがPythonの関数として渡されるのに対し、後者はbashのコマンドやスクリプトとして渡される点が異なります。後者はbashのコマンドやスクリプトを渡します。op_kwargs を使って任意の数の引数を渡すことができます。
5. SSHOperator
SSHOperator は ssh プロトコルを使ってリモートホストと通信しますが、ここで重要なのは、SSHOperator はスクリプトを呼び出すときにユーザーの設定ファイルを読み込まないという点です。スクリプトが呼び出されたときに現在のユーザーの設定情報が自動的に読み込まれるように、以下のコードをスクリプトに追加するとよいでしょう。
. ~/.profile
or
. ~/.bashrc
以下は、SSHOperatorタスクの例である。
task_crm = SSHOperator(
ssh_conn_id='ssh_crmetl', # Specify the conn_id
task_id='crmetl-filesystem-check',
command='/home/crmetl/bin/monitor/filesystem_monitor.sh', # script file on remote machine
dag=dag
)
ここで、ssh_crmetl は、以下のように airflow Web サーバーインターフェイスで設定される接続 ID です。
ウェブサーバーを開き、以下のように管理メニューの「接続」項目をクリックします。
次に、Createを選択して新しいssh接続を作成し、次の画像のように接続ID、IPアドレス、ユーザー名パスワードなどを入力します。
保存後、ssh_crmetl を使用して、対応するホストでスクリプトを呼び出すことができます。
6. HiveOperator
hive は、Hadoop ベースの
Hadoopをベースとしたデータウェアハウス
構造化されたデータファイルをデータベースのテーブルにマッピングし、SQLステートメントをMapReduceに変換できるシンプルなSQLクエリ機能を提供するツールです。
pip install apache-airflow[hive]
以下は使用例です。
t1 = HiveOperator(
task_id='simple_query',
hql='select * from cities',
dag=dag)
一般的なOperatorは、DockerOperator、OracleOperator、MysqlOperator、DummyOperator、SimpleHttpOperatorなどで、これらは似たような使い方をするので、一々説明しないことにします。
7. オペレーターのカスタマイズ方法
それでも公式のOperatorが要件を満たさない場合は、独自のOperatorを開発します。Operatorの開発は、BaseOperatorを継承してexecuteメソッドを実装するだけと簡単です。
from airflow.models import BaseOperator
class MyOperator(BaseOperator):
def __init__(*args, **kwargs):
super(MyOperator, self). __init__(*args, **kwargs)
def execute(self, context):
###do something here
execute メソッド以外に、以下のメソッドを実装することができる。
on_kill。タスクがkillされたときに実行されます。
カスタムOperatorにJinjiaテンプレートサポートを追加するには?
とても簡単で、カスタムOperatorクラスにプロパティを追加するだけです。
template_fields = (attributes_to_be_rendered_with_jinja)
つまり、例えば公式のbash_operatorは次のようなものです。
template_fields = ('bash_command', 'env')
このようにすると、タスクを実行する前に、airflowが自動的にbash_commandやenvにあるプロパティをレンダリングしてくれます。
要約すると、airflowはデータベース、分散ファイルシステム、http接続、リモートタスクなどのために十分便利なOperatorを公式に提供しています。Airflowのオペレータのソースコードを参照することができ、基本的にあなたの日常業務のニーズを満たすものです。restful aptの形でインターフェースを提供し、SimpleHttpOperatorを使用してタスクの呼び出しを実装することができます。
関連
-
[解決済み】「RuntimeError: dictionary changed size during iteration」エラーを回避する方法とは?
-
[解決済み] モジュールに属性がない
-
[解決済み] TypeError: 整数の引数を期待したが、画像をグレイスケールに変換するときに浮動小数点になった。
-
[解決済み] Random モジュールが動作しない。ValueError: randrange() の範囲が空です (1,1, 0)
-
[解決済み] Youtube_dl : ERROR : YouTube は言いました。動画データを抽出できません
-
[解決済み] numpyでベクトルの逆クロス積を計算するにはどうしたらいいですか?
-
[解決済み] sys.argv[1], IndexError: list index out of range [duplicate].
-
[解決済み] リクエスト'を解決できません。このモジュールにはインテリセンスがない可能性があります。Visual Studio/Python
-
(int "ではなく)strをstrに連結するとエラーが発生する。
-
解決済みのエラーです。ModuleNotFoundError: tflite_runtime' という名前のモジュールはありません。
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
Python百行で韓服サークルの画像クロールを実現する
-
Djangoプロジェクトの構成は、独立した実装を分割する
-
[解決済み] リストを配列に変換できない。ValueError: Pythonのスカラーに変換できるのは1要素のテンソルだけです。
-
[解決済み] インデックスを持たないデータフレーム列の最後の値の取得
-
[解決済み] Python : numpy.saveで辞書を保存する [重複].
-
[解決済み] モデルパラメータをフィットさせるためのfmin_l_bfgs_bの正しい使用法
-
[解決済み] Pythonです。ImportError: lxml が見つかりません、インストールしてください。
-
pip install MySQL-python reports "EnvironmentError: mysql_config not found" (環境エラー:mysql_configが見つかりません。
-
AttributeError; 'Series' object has no attribute 'split' Solve
-
OperationalError: データベースファイルを開くことができません。