[解決済み] エアフローでBigQueryOperatorから結果を取得する
2022-02-14 16:50:26
質問
から結果を取得しようとしています。
BigQueryOperator
を使用していますが、その方法を見つけることができませんでした。私は
next()
メソッドを
bq_cursor
を返しますが(1.10で使用可能)、これは
None
. これは私が試した方法です
import datetime
import logging
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time()
)
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='select count(*) from mydataset.mytable'
)
big_query_count.execute(context=kwargs)
logging.info(big_query_count)
logging.info(big_query_count.__dict__)
logging.info(big_query_count.bq_cursor.next())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'project_id': 'myproject'
}
with models.DAG(
'bigquery_results_execution',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
myoperator = python_operator.PythonOperator(
task_id='threshold_operator',
provide_context=True,
python_callable=MyChequer
)
# Define DAG
myoperator
を見てみましょう。 bigquery_hook.py と bigquery_operator.py が、結果を取得するための唯一の利用可能な方法であるように思われます。
解決方法は?
kaxilさん、@Mikeさん、回答ありがとうございました。問題がわかりました。の部分に(私の中では)一種のバグがあるようです。
BigQueryCursor
. の一部として
run_with_configuration
は、その
running_job_id
は返されますが、決して
job_id
で結果を引き出すために使用される
next
メソッドを使用します。回避策(エレガントではありませんが、すべてを再実装したくない場合に有効です)として、このメソッドに
job_id
をベースにした
running_job_id
をフックの中に入れて、次のようにします。
big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())
関連
-
pythonを使ったオフィス自動化コード例
-
Python関数の高度な応用を解説
-
PythonはWordの読み書きの変更操作を実装している
-
pyCaret効率化乗算器 オープンソース ローコード Python機械学習ツール
-
[解決済み】「RuntimeError: dictionary changed size during iteration」エラーを回避する方法とは?
-
[解決済み】 NameError: グローバル名 'xrange' は Python 3 で定義されていません。
-
[解決済み】csv.Error:イテレータはバイトではなく文字列を返すべき
-
[解決済み】IndexError: invalid index to scalar variableを修正する方法
-
[解決済み] 列の値に基づいてDataFrameから行を選択するにはどうすればよいですか?
-
[解決済み] Pythonの辞書からキーを削除するにはどうしたらいいですか?
最新
-
nginxです。[emerg] 0.0.0.0:80 への bind() に失敗しました (98: アドレスは既に使用中です)
-
htmlページでギリシャ文字を使うには
-
ピュアhtml+cssでの要素読み込み効果
-
純粋なhtml + cssで五輪を実現するサンプルコード
-
ナビゲーションバー・ドロップダウンメニューのHTML+CSSサンプルコード
-
タイピング効果を実現するピュアhtml+css
-
htmlの選択ボックスのプレースホルダー作成に関する質問
-
html css3 伸縮しない 画像表示効果
-
トップナビゲーションバーメニュー作成用HTML+CSS
-
html+css 実装 サイバーパンク風ボタン
おすすめ
-
PythonによるLeNetネットワークモデルの学習と予測
-
Pythonによるjieba分割ライブラリ
-
Pythonの@decoratorsについてまとめてみました。
-
[解決済み] [Solved] sklearn error ValueError: 入力に NaN、infinity または dtype('float64') に対して大きすぎる値が含まれている。
-
[解決済み】ImportError: sklearn.cross_validation という名前のモジュールがない。
-
[解決済み】Python regex AttributeError: 'NoneType' オブジェクトに 'group' 属性がない。
-
[解決済み】ilocが「IndexError: single positional indexer is out-of-bounds」を出す。
-
[解決済み】pygame.error: ビデオシステムが初期化されていない
-
[解決済み】 NameError: グローバル名 'xrange' は Python 3 で定義されていません。
-
[解決済み] TypeError: 'DataFrame' オブジェクトは呼び出し可能ではない