Google BigQueryのPythonライブラリbqlibを作りました

Google BigQueryをPythonから扱うbqlibというモジュールを作りました。
BigQueryとは簡単に説明すると、テラバイト級のデータセットに対してSQLを使って様々なメトリクスを分析できるサービスです。
競合にはAmazonのRedshiftやTreasureDataなどがあります。

BigQueryのPythonクライアントはGoogle社が提供しているbigquery_client.pyが既にありますが、 プリミティブな操作の集合のようなライブラリとなっており実際に使おうとすると手順が多いので、今回そのモジュールのラッパーライブラリとして簡単に扱えるものを作りました。

インストール

pipやeasy_installで入ります。依存しているbigquery_client.pyなども一緒に入ります。

$ pip install bqlib

使い方

実際にbigquery_client.pyと比較しながら使い方を説明します。

同期型クエリ

最初に同期型クエリを見てみます。
同期型クエリはクエリを開始してから実行完了までブロックされます。

(bigquery_client.pyの場合)

client = BigqueryClient(api='bigquery',
                        api_version='2.0',
                        project_id='my_project',
                        credentials=credentials)
job = client.RunQuery(query='SELECT date, profit FROM sales') # クエリ開始->クエリ実行完了までブロック
fields, rows = client.ReadSchemaAndRows(job['configuration']['query']['destinationTable']) # 結果の取得
print rows # [['2013-10-25','12300'], ['2013-10-26','9340'], ...]

最終的にrowsがクエリの結果になります。

(bqlibの場合)

bqjob = BQJob(authorized_http,
              'my_project', 
              query='SELECT date, profit FROM sales')
job_result = bqjob.run_sync() # クエリ開始->結果の取得
print job_result # [{u'date': '2013-10-25', u'profit': 12300}, {u'date': '2013-10-26', u'profit': 9340}, ...]

bqlibを使うと記述が簡単になるだけでなく、結果セットのスキーマのフィールド型から自動的にPythonに対応する型に変換してクエリの結果を返してくれるのでプログラムから扱いやすくなります。(例:'profit'フィールドはINTEGER型なので数値に変換される)

非同期型クエリ

非同期型クエリはクエリを開始した直後に制御が呼び出しプログラムに戻り、任意のタイミングで結果を取りにいくパターンです。

(bigquery_client.pyの場合)

client = BigqueryClient(api='bigquery',
                        api_version='2.0',
                        project_id='my_project',
                        credentials=credentials)
job_id = client.Query('SELECT date, profit FROM sales') # 非同期クエリ実行。すぐに制御が戻る

#### do something ####

job_reference = client.GetJobReference(job_id)
job = client.WaitJob(job_reference=job_reference) # クエリ完了待ち
fields, rows = client.ReadSchemaAndRows(job['configuration']['query']['destinationTable']) # 結果の取得
print rows # [['2013-10-25','12300'], ['2013-10-26','9340'], ...]

んー、これは結構めんどくさいですね...。

bqlibで書くと非同期型もすっきり書けます。

(bqlibの場合)

bqjob = BQJob(authorized_http,
              'my_project', 
              query='SELECT date, profit FROM sales')
bqjob.run_async() # 非同期クエリ実行。すぐに制御が戻る

#### do something ####

job_result = bqjob.get_result() # 結果の取得
print job_result # [{u'date': '2013-10-25', u'profit': 12300}, {u'date': '2013-10-26', u'profit': 9340}, ...]

基本、run_syncでクエリを実行していた部分がrun_asyncに変わるだけです。

複数クエリの並列実行

ここからはbqlib独自の機能になります。
bigquery_client.pyを使って複数クエリを連続して実行しようとすると、上記の非同期型クエリをクエリ数分だけ書き、それぞれ実行し、1つ1つ実行結果を得る必要があります。
bqlibのBQJobGroupを使えば複数クエリをまとめて並列に実行可能です。

bqjob1 = BQJob(authorized_http,
               'my_project', 
               query='SELECT date, profit FROM sales')
bqjob2 = BQJob(authorized_http,
               'my_project', 
               query='SELECT age FROM customer')

# 実行したいjobをグループとしてまとめる
job_group = BQJobGroup([bqjob1, bqjob2])

# 同期型クエリ
results = job_group.run_sync()

# もしくは非同期に
job_group.run_async()
results = job_group.get_results()

print results # [[{u'date': '2013-10-25', u'profit': 12300}], [{u'age': 23}]]

自動リトライ機能

bqlibが提供するクエリ関数は全てエラーが起きた時に自動でリトライするようになっています。
ただし現時点ではBigQuery側のサーバサイドのエラー以外でも(但しデータセットやテーブルが存在しないエラーを除く)勝手にリトライしてしまうのでこの辺は改善の余地があります。

Discovery Documentの自動キャッシュ

BigQueryのAPIを叩くためにはGoogleAPI Discovery Serviceを使用してBigQueryのAPI仕様が記述されたDiscovery Documentをダウンロードしてくる必要があります(但しbigquery_client.py v2.0.17からは何も指定しなければモジュールにビルトインされているものが使われるようになりました)。 このダウンロードされたDiscovery Documentは、ユーザがbqlibをGoogle App Engineで使用している場合memcachedに自動でキャッシュされるようになっています。

さいごに

以上bigquery_client.pyと比較しつつざっとbqlibの使い方をまとめてみました。
結局のところラッパーライブラリなので元のクライアントライブラリの変更をもろに受けてしまうのが辛いとこですが、まだまだ基本機能しかないので独自機能を入れて今後も拡張していきたいと思ってます。

要望・バグ報告はこちらまで!
https://github.com/addsict/bqlib