1. ホーム
  2. python

[解決済み] エアフローでBigQueryOperatorから結果を取得する

2022-02-14 16:50:26

質問

から結果を取得しようとしています。 BigQueryOperator を使用していますが、その方法を見つけることができませんでした。私は next() メソッドを bq_cursor を返しますが(1.10で使用可能)、これは None . これは私が試した方法です

import datetime
import logging

from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator


yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time()
)

def MyChequer(**kwargs):
    big_query_count = bigquery_operator.BigQueryOperator(
        task_id='my_bq_query',
        sql='select count(*) from mydataset.mytable'
    )

    big_query_count.execute(context=kwargs)

    logging.info(big_query_count)
    logging.info(big_query_count.__dict__)
    logging.info(big_query_count.bq_cursor.next())

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'project_id': 'myproject'
}

with models.DAG(
        'bigquery_results_execution',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

    myoperator = python_operator.PythonOperator(
        task_id='threshold_operator',
        provide_context=True,
        python_callable=MyChequer
    )

    # Define DAG
    myoperator

を見てみましょう。 bigquery_hook.py bigquery_operator.py が、結果を取得するための唯一の利用可能な方法であるように思われます。

解決方法は?

kaxilさん、@Mikeさん、回答ありがとうございました。問題がわかりました。の部分に(私の中では)一種のバグがあるようです。 BigQueryCursor . の一部として run_with_configuration は、その running_job_id は返されますが、決して job_id で結果を引き出すために使用される next メソッドを使用します。回避策(エレガントではありませんが、すべてを再実装したくない場合に有効です)として、このメソッドに job_id をベースにした running_job_id をフックの中に入れて、次のようにします。

big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())