App Engine と Cloud Tasks で単発のバッチジョブを実行する

App Engine でアプリケーションを運用していると、たまに単発のバッチジョブを実行したいことがあります。例えば Datastore の Entity のマイグレーションを行ったり、環境セットアップの一環として Datastore に初期データを詰めたりしたいことってありますよね。

もちろん Datastore を操作するだけであれば Cloud Datastore APIスクリプトから使ったり、Cloud Dataflow のバッチジョブで操作するということもできますが、App Engine とのコードの共通化が難しいですし、何より App Engine の開発作法からなるべく外れたくありません。

そこで今回は、本日ベータリリースが発表された Cloud Tasks を使って、そのような単発のバッチジョブを App Engine で実行する方法を見てみたいと思います。

Cloud Tasks とは

cloud.google.com

App Engine で非同期タスクを扱う Task Queue を、App Engine 以外の GCP プロダクトからも利用できるようにしたサービスです。 Task Queue と Cloud Tasks では同一の世界を見ているため、どちらの API を使っても同じ Queue や Task を扱うことができます。

なぜ単発バッチジョブに Cloud Tasks を使うか

任意のタイミングで実行できるバッチジョブといえば App Engine cron がありますが、App Engine cron では定期的に実行するスケジューリングしか出来ないため、単発のバッチジョブには不向きです。

では通常の Task Queue はどうかというと、Task Queue でタスクを作成するためには App Engine からでないと出来ないため、外から任意のタイミングで発火させるのがやりづらい状況でした。

そこで本日出た Cloud Tasks です。Cloud Tasks は タスクを作成する API が存在するため、外から任意のタイミングでタスクを作成し、それを契機にバッチジョブを動かすことができます。

設定方法

設定と言っても通常の Task Queue を扱う方法と変わりませんが、要点をいくつかまとめます。

1. Handler を定義する

バッチジョブの本体となるロジックを HTTP Handler として定義します。Task Queue からのリクエストは 0.1.0.2 という IP アドレスから来るので、それ以外はブロックするといいでしょう。

以下 Go での例です。

http.HandleFunc("/tasks/some_batch_task", func(w http.ResponseWriter, r *http.Request) {
    if r.RemoteAddr != "0.1.0.2" {
        http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
        return
    }

    // do some batch job...

    fmt.Fprintf(w, "finish task\n")
})

2. app.yaml に Handler を追加

先程定義した Handler がリクエストを受け付けれるように、app.yaml に Handler を追加します。公式ドキュメントに記載されているように handler の設定に login: admin を追加しておくと、GCP プロジェクトの管理者(or Task Queue から)でないとアクセスできなくなるので、IPアドレスでのフィルタリングと合わせて設定しておくと安全です。

handlers:
- url: /tasks/.*
  login: admin
  script: _go_app

3. queue.yaml に Queue を追加

バッチジョブを起動させるための Queue を追加します。

ポイントとしては bucket_size: 0 にしておくことです。こうしておくことで、なんらかのタイミングで意図せず該当の Queue にタスクが作成されてしまっても、自動でタスクが実行されないようになります。

queue:
- name: my-batch-queue
  bucket_size: 0
  rate: 1/s
  target: my-service

bucket_size に関しては、詳しくは過去のエントリを参照してください。

4. app.yaml と queue.yaml をデプロイ

アプリケーションと Queue の設定をデプロイします。

gcloud app deploy app.yaml
gcloud app deploy queue.yaml

5. Cloud Tasks API を使ってタスクを実行

あとはジョブを実行したい任意のタイミングで gcloud tasks create-app-engine-task を使ってタスクを作成します。パラメータが必要な場合は --header--payload-content オプションを追加で指定します。

gcloud beta tasks create-app-engine-task --queue=my-batch-queue --url=/tasks/some_batch_task

queue.yamlbucket_size: 0 と定義した場合は自動でタスクが実行されないので、Cloud Tasks の console 画面から「Run Now」と書かれたボタンを手動でポチッと押して実行します。

f:id:furuyamayuuki:20180928180524p:plain

バッチジョブは実行に時間がかかることも多いので進捗を知れることが大事ですが、アプリから吐いたログは Stackdriver Logging に都度表示されるので、進捗はログに出しておくと良さげです。以下の例は30秒かかるジョブを実行したときに、4つのログが吐かれている例です。

f:id:furuyamayuuki:20180928180609p:plain

注意点

Task Queue をバッチジョブとして用いた場合、いくつか注意点があります。

デッドラインに注意

App Engine ではリクエストを処理できる時間に制限時間があります。Task Queue からのリクエストであれば automatic scaling の場合は10分、basic or manual scaling の場合は24時間です(参考)。 これ以上の処理時間がかかるようであれば、バッチジョブの作業内容を別々のタスクに分割してこの時間に収めるのがいいでしょう。

処理を冪等にすること

エラーが発生した場合自動でリトライが走ります(ただし backet_size: 0 の場合はこの限りじゃありません)。処理を冪等にしておくことで何度リトライが走っても大丈夫なようにします。これはバッチジョブ全般にいえるプラクティスですね。

途中でやめることはできない

何かクリティカルな問題が発生したとしても、アプリケーションがエラーレスポンスを返さない限り途中で abort させることはできません。インスタンスを落とせば出来るのかもしれませんが、本番のサービスでそれを行うことは難しいでしょう...。

まとめ

本日ベータリリースされた Cloud Tasks を使うことで、App Engine で単発のバッチジョブを実行できることを説明しました。 Cloud Tasks は特に、今まで Task Queue が使えなかった第二世代のランタイムの App Engine ユーザ向けにフィーチャーされてますが、既存の App Engine ユーザでも使い所がありそうですね。

Task Queue と Token Bucket アルゴリズム

GAE の Task Queue (Push Queue) は Queue に入れられたタスクを全て一気に実行するのではなく、あらかじめ設定しておいた実行レートに従って、バックエンドの App Engine インスタンスにリクエストを投げてくれます。この実行レート制御のベースとなっているのが Token Bucket というアルゴリズムです。

今回はその Token Bucket アルゴリズムと、Task Queue の設定値である

  • bucket_size
  • rate
  • max_concurrent_requests

にどのような関連性があるか、まとめてみたいと思います。

Token Bucket アルゴリズム

Token Bucket はネットワークに流れるトラフィックを一定量以下になるように調整するアルゴリズムであり、Amazon EBS の IOPS のバースト制御Amazon API Gateway での Rate Limit でも使われてたりします。

Task Queue 上での Token Bucket アルゴリズムのルールは以下のようになります。

f:id:furuyamayuuki:20180924114855p:plain:w250

ここで大事なのは、トークンがなければタスクの実行は待たされる = バッファリングされる、という点です。これがあることで、例えタスクが一度に大量に enqueue されても、バックエンドには安定したレートでリクエストが投げられるようになります。これは後で例を用いて説明します。

Task Queue の設定値

次に Task Queue の各設定値 を見てみたいと思います。

bucket_size

その名の通りバケットのサイズを決める設定値です。

このバケットサイズ分トークンを貯め込むことができるので、一度に実行が開始されるタスク量はこの値でキャップがかかります(例: バケットサイズが100の場合、一度に実行開始できるタスクは100個まで)。

たまに混乱してしまいますが、Queue に保持できるタスクの量とは関係ありません。 (公式ドキュメントによると、課金している場合最大100億タスクまで保持できます。)

rate

トークンの補充レートです。

この補充レート以上でタスクが enqueue された場合、例えバケットのサイズが十分大きかったとしてもいずれバケットの中のトークンは0になってしまうので、長期的に見るとこの値がタスクの最大実行開始レートとなります。

トークンは時間経過によって補充されていきます。実行しているタスクが終わったかどうかは関係ありません。

max_concurrent_requests

タスクの最大同時実行数を決める設定値です。

それだけ聞くと bucket_size との違いがよくわかりませんが、

  • bucket_size はある瞬間の最大実行開始数を定義する
  • max_concurrent_requests はある瞬間の最大同時実行数を定義する

という違いがあります。

例えば、処理を完了するのに物凄い時間がかかるタスクがあったとします。Token Bucket アルゴリズムでは、今までのタスクが終わってなかろうが、おかまいなしにトークンを補充していくので、時間経過に伴ってタスクが随時実行開始されていき、同時実行しているタスクが積み重なっていくことになります。 そういう場合に max_concurrent_requests を設定することで、タスクの同時実行数にキャップをかけることができます。

ちなみに bucket_sizerate は Token Bucket アルゴリズムに関連したパラメータですが、max_concurrent_requests はそれとは全く関係ありません。

bucket_size の用途

ここからは bucket_size の用途を、例を挙げながらもう少し見てみたいと思います。条件として、既に Queue にタスクが十分に積まれていて、それらを秒間最大500リクエストで処理したいとします。またバケットに対してトークンは200ms毎に補充されると仮定します。

尚、以下に示すグラフは実際の挙動からプロットしたものではなく、ドキュメントを元に動作を推測したものとなります(なので大分単純化されています)。

bucket_size: 500, rate: 500/s の場合

bucket_sizerate を両方共500にするとどうなるか見てみましょう。

f:id:furuyamayuuki:20180924115400p:plain:w400

横軸が時間で、縦軸が実行開始されるタスクの量です。

タスクはバケットトークンがある分だけ実行されてしまうので、500個のトークンがあれば500個のタスクが一度に実行されてしまい、スパイクのようなリクエストが飛んでしまいます。 確かにこれでも秒間500リクエスト処理していると言えますが、バックエンドに瞬間的な高負荷がかかってしまいます。

bucket_size: 100, rate: 500/s の場合

今度は rate は 500/s のまま、bucket_size を100にしてみます。

f:id:furuyamayuuki:20180924115507p:plain:w400

すると、一回あたりのタスクの実行開始数が100個に抑えられ、小刻みに実行されるようになります。つまり、タスクがバッファリングされ実行開始数が平滑化されることで、同じ秒間500リクエストでもより安定してバックエンドにリクエストを送れるようになります。

このまま更に bucket_size を小さくすればより平滑化されるように見えますが、内部的なトークンの補充間隔が広いとタスクが十分に実行されないことになるので、公式ドキュメントで推奨されているように rate を5で割った rate/5 の値にしておくのがいいと思われます。

まとめ

Task Queue の実行レート制御のベースとなる Token Bucket アルゴリズムと、それに関連した3つの設定値

  • bucket_size
  • rate
  • max_concurrent_requests

の役割をまとめました。

適切に設定してバックエンドを突発的な負荷から守るようにしたいですね。

GAE ローカル開発のログをいい感じに表示してくれる gaelv というツールを作りました

App Engine のアプリをローカルで開発していると、標準だとログがこんな感じに表示されてしまって視認性が悪かったりしますよね。

f:id:furuyamayuuki:20171223151536p:plain:w600

特にどのアプリケーションログがリクエストログに紐付いているのかわかりづらかったりします。
そこで GCP にアプリをデプロイした時と同じように、Stackdriver のような見た目でログを見れる gaelv というツールを作ってみました。
https://github.com/addsict/gaelv

f:id:furuyamayuuki:20171223151609p:plain:w600

インストール

go get 一発でインストールできます。

go get -u github.com/addsict/gaelv/...

インストールすると gaelv というコマンドが使えるようになります。

$ gaelv
Usage:

    gaelv --logs_path=</path/to/log.db> [options...]

Options:

    --logs_path        Path to logs file
    --port             Port for server
    --console          Print logs in the console

使い方

まずいつも通り dev_appserver.py で GAE アプリケーションを立ち上げます。
その際に --logs_path=<path> というオプションをつけてください (<path>部分は適当なファイルパスでokです)。

dev_appserver.py app.yaml --logs_path=/tmp/log.db

次に以下のコマンドで gaelv を立ち上げます。

gaelv --logs_path=/tmp/log.db

最後にブラウザで http://localhost:9090/ を開くと Log Viewer が表示されます。
この状態でアプリケーションにアクセスすると、リアルタイムにログが流れてきます。
(※ただし後述の注意点にあるように、ログのバッファリングをOFFにした場合のみ)

f:id:furuyamayuuki:20171223151727p:plain:w600

または --console オプションでログをいい感じにコンソールに表示することもできます。

gaelv --logs_path=/tmp/log.db --console

f:id:furuyamayuuki:20171223151743p:plain:w600

注意点

dev_appserver.py は標準だとログを5秒間バッファリングしてしまい、すぐには Log Viewer の方に表示されません。 また厄介なことに5秒経ったら自動的にフラッシュされるのではなく、次にリクエストが来た時にフラッシュする仕組みになっています。 残念ながらこのバッファリングはオプションで OFF にすることができないため、App Engine SDKソースコードを直接いじってしまう方法以外は、現状だと回避策はなさそうです。

バッファリングを OFF にするには /usr/local/google-cloud-sdk/platform/google_appengine/google/appengine/api/logservice/logservice_stub.py というファイルの _MIN_COMMIT_INTERVAL という変数を 0 に書き換えればOKです。

diff を貼っておきますので参考にしてみてください。

--- a/logservice_stub.py
+++ b/logservice_stub.py
@@ -86,7 +86,7 @@ class LogServiceStub(apiproxy_stub.APIProxyStub):
-  _MIN_COMMIT_INTERVAL = 5
+  _MIN_COMMIT_INTERVAL = 0

仕組み

このツールの全体像はこんな感じになっていて、ポイントとなる仕組みを少し紹介します。

f:id:furuyamayuuki:20171223151801p:plain:w400

ログの取得方法

App Engine は Log Service という API でリクエストログとアプリケーションログの取得を行うことができます。 ログはデプロイされたアプリでは Stackdriver に入っていきますが、ローカルではそれをシミュレートするために SQLite の DB に入っていきます。 使い方で説明した --logs_path というオプションが、その DB ファイルを指し示しています。
今回はその SQLite の DB を直接参照する形で実装しました。

参考) 内部で使われているリクエストログのテーブル構造

CREATE TABLE IF NOT EXISTS RequestLogs (
  id INTEGER NOT NULL PRIMARY KEY,
  user_request_id TEXT NOT NULL,
  app_id TEXT NOT NULL,
  version_id TEXT NOT NULL,
  module TEXT NOT NULL,
  ip TEXT NOT NULL,
  nickname TEXT NOT NULL,
  start_time INTEGER NOT NULL,
  end_time INTEGER DEFAULT 0 NOT NULL,
  method TEXT NOT NULL,
  resource TEXT NOT NULL,
  http_version TEXT NOT NULL,
  status INTEGER DEFAULT 0 NOT NULL,
  response_size INTEGER DEFAULT 0 NOT NULL,
  user_agent TEXT NOT NULL,
  url_map_entry TEXT DEFAULT '' NOT NULL,
  host TEXT NOT NULL,
  referrer TEXT,
  task_queue_name TEXT DEFAULT '' NOT NULL,
  task_name TEXT DEFAULT '' NOT NULL,
  latency INTEGER DEFAULT 0 NOT NULL,
  mcycles INTEGER DEFAULT 0 NOT NULL,
  finished INTEGER DEFAULT 0 NOT NULL
);

ブラウザへのストリーミング

サーバからブラウザへは Server Sent Events (SSE) を使ってログをストリーミングとして送っています。 HTTP の標準の仕組みで実装できるので、サーバ・クライアント共に実装がシンプルになります。
golang での実装は kljensen/golang-html5-sse-example を参考にさせて頂きました。

最後に

ログを人間にいかにわかりやすく見せるかは、開発効率の向上に大きく繋がってくると思っています。
まだ荒削りなクオリティではありますが、簡単に使えるので是非使ってみてください!

Cloud Datastore は Entity 毎になぜ秒間1回の書き込み制約があるのか

はじめに

Cloud Datastore では 1 write / entity group / sec という制約があります (公式ドキュメント)。

確かに Datastore を使っていると、同一 Entity Group に対する書き込み頻度が高い時に一部の書き込みが失敗することは経験上よくありましたが、 なぜこの制約が存在しているのかはドキュメントに詳しくは記載されていませんでした。 そこで Cloud Datastore のバックエンドである Megastore の論文「Megastore: Providing Scalable, Highly Available Storage for Interactive Services」をベースに、その理由を調べてみました。

※公開されている情報から推測しているので、間違っていた場合ご指摘下さい。

TL;DR

先に結論を述べると、レプリカ間で書き込みデータのレプリケーションを行なう手続きに、最大で1秒ほどの時間がかかるためです。

より詳しく述べると、Cloud Datastore の以下の特徴から書き込みスループットの制約が存在します。

  • 高可用性を実現するためにデータセンターを跨いで複数のレプリカを動かしており、レプリカ間で同期的なレプリケーションを行っている
  • どのレプリカからも読み込み・書き込みが開始でき、クエリによっては Strong Consistency なデータを取得できる
  • レプリカ間でのレプリケーションには Paxos を用いており、Paxos の手続きの完了にデータセンター間で 1RTT もしくは 2RTT のネットワークレイテンシを要する

Megastore のアーキテクチャ

f:id:furuyamayuuki:20171210225222p:plain:w400

Megastore は高い可用性を実現するためにデータを地理的に離れた複数のデータセンターで保持していますが、 各ノードで一貫性のあるデータ読み込みを実現するために、ノード間で同期的なレプリケーションを行っています。

レプリケーションはマスター・スレーブのような構成ではなく、全てのノードから Read / Write を発行できるマルチマスターのようなレプリケーションとなっており、各ノードは「レプリカ」と呼ばれています。

つまり、同じ内容を保持しているデータベースが地理的に離れた箇所に複数デプロイされている形です。

Read 手順

f:id:furuyamayuuki:20171210225723p:plain:w400

Megastore での Write 手順を見る前に、参考までに Megastore からのデータ読み込みがどういう手順で行われるか見てみます。

論文の "4.6.2 Reads" から引用すると、読み込みは以下の手順で行われます。

  1. Query Local: 地理的に最も近いレプリカの Coordinator に、所持している Entity Group のデータが最新かどうかを問い合わせる。
  2. Find Position: 1番で最新だった場合、選択したレプリカから Entity Group の最新のログポジションとタイムスタンプを取得し、3番、4番の処理をスキップする。最新でなかった場合、他の全てのレプリカにログポジションを問い合わせ、過半数の結果を採用する。
  3. Catchup: 2番で取得した最新のログポジションを元に他のレプリカからログ (= Write Ahead Log) を取得し、自身のデータベースに適用していく。
  4. Validate: Coordinator に今持っている Entity Group のデータが最新であることを伝える。
  5. Query Data: 選択したレプリカから Engity Group のデータを読み出す

この手順を見ると、仮にあるレプリカが他のレプリカと比べて古いデータを保持していたとしても、一連のシーケンスの中で他のレプリカへの追いつきが走るようになっています。 つまりどのレプリカから Read しようとしても、必ずシステムに最後にコミットされた最新のデータが読み取れること (Strong Consistency) が保証されています。

Write 手順

f:id:furuyamayuuki:20171210225801p:plain:w400

次に書き込みの手順です。

書き込む値のレプリケーションPaxos を使って行われます。 通常の Paxos は合意形成に至るまでノード間の通信が少なくとも2回 (Prepare phase & Accept phase) 必要ですが、 Megastore が実装している Paxos ではノード間の通信を最短で1回のラウンドトリップで済むような拡張を入れています。

論文の "4.6.3 Writes" から引用すると、書き込みは以下のような流れで行われます。

  1. Accept Leader: リーダーレプリカに proposal number = 0 で Proposal を投げる。これが成功した場合2番をスキップする。
  2. Prepare: 全てのレプリカに Prepare メッセージを投げる。
  3. Accept: リーダー以外の全てのレプリカに Proposal を投げる。全てのレプリカが Accept した場合4番をスキップする。
  4. Invalidate: 3番で一部のレプリカが Accept しなかった場合、そのレプリカの Coordinator に対して Invalidate メッセージを送る(そのレプリカが持つ現在の値を読ませないようにするため)。
  5. Apply: 合意した変更点を実際のストレージ(Bigtable)に適用させる

1番の Accept Leader フェーズが Paxos の拡張になっており、後続の Prepare フェーズをスキップ出来るようになっています。 これは1つ前の書き込みの完了に、その次の Prepare の意味も持たしていることから来ていますが、より詳細な前提条件の説明が必要なため、詳しくは論文の "4.4.2 Fast Writes" を参照して下さい。

Write が完了するタイミング

このシーケンスのなかで、元の Client に書き込みの完了 (Ack) を伝えれるのは3番、もしくは4番が終わったタイミングになります。 よって最短で書き込みに要するネットワークレイテンシは、

# 全てのレプリカが Accept した場合
RTT(Client, A) + MAX(RTT(Client, B), RTT(Client, C))

# 一部のレプリカが Accept しなかった場合
RTT(Client, A) + MAX(RTT(Client, B), RTT(Client, C)) + RTT(Client, C)

となりますが、Client ↔  Replica A 間は地理的になるべく近い場所にいる(ように Leader を選ぶ)ため、両者の RTT は非常に短い時間だと考えられます。 よってその時間は十分無視できるとすると、

# 全てのレプリカが Accept した場合
MAX(RTT(Client, B), RTT(Client, C))

# 一部のレプリカが Accept しなかった場合
MAX(RTT(Client, B), RTT(Client, C)) + RTT(Client, C)

となり、大体1RTT、もしくは2RTTかかることになります。

仮にレプリカがアメリカの西海岸と東海岸に配置されていると仮定すると、レプリカ間のRTTは約120msです。 (参考: AT&Tが公開しているネットワークレイテンシから、サンフランシスコとニューヨーク間のレイテンシ: 62ms を持ってきました。)

よって、書き込みのレイテンシはネットワークレイテンシに実際の各レプリカの処理時間を乗せたものになるので、150ms〜300msくらいが標準的な書き込み時間になるのではないでしょうか。

そしてトランザクション内での Entity Group に対する書き込みはシリアライズ化されているので、この書き込み1回あたりのレイテンシがそのままスループット制約となります。

Google で計測されたレイテンシ

f:id:furuyamayuuki:20171210230118p:plain:w400

論文の "4.10 Production Metrics" に Google の本番環境での Megastore の読み込み・書き込みレイテンシが載っていました。 それによると、平均的なアプリケーションは大体100ms〜500msで書き込みが完了しているようです。 ただし、この値はデータセンター間の距離や、レプリカ数に依存するとも述べられているので、場合によっては1秒近くかかってしまうこともあるのでしょう。

まとめ

Cloud Datastore の 1 write / entity group / sec の制約について調べ、データセンター間での同期的なレプリケーションに要する時間がネックになっていることを説明しました。 ちなみに Megastore の論文の中でも "4.8 Write Throughput" において、書き込みのスループットが高くないことを課題として挙げています。

Our implementation of Paxos has interesting tradeoffs in system behavior. Application servers in multiple datacenters may initiate writes to the same entity group and log position simultaneously. All but one of them will fail and need to retry their transactions. The increased latency imposed by synchronous replication increases the likelihood of conflicts for a given per-entity-group commit rate.

(訳) 私達の Paxos の実装はシステムの動作に興味深いトレードオフをもたらします。複数のデータセンタ上のアプリケーションサーバから、同一 Entity Group の同一ログポジションに同時に書き込みを行おうとするかもしれません。その場合、その内の一つを除いた全てのクライアントはトランザクションをリトライする必要があります。(Megastore が行っている)同期的なレプリケーションによって生じる高いレイテンシは、ある Entity Group に対するコミットがコンフリクトしてしまう可能性を上げることになります。

このパフォーマンス上のボトルネックを解決することを1つの目的として、後続のデータベースである Spanner の開発に繋がっていったようですね。

参考

GCPコンソールのヘッダの色を変更するChrome拡張を作りました

GCP のコンソールで複数のプロジェクトを操作していると、特定のプロジェクトだけ視覚的に目立たせたいことがあります。 特に開発用のプロジェクトと本番のプロジェクトを行き来していると、たまにプロジェクトを間違えそうになってヒヤッとすることがありますよね。

そこで GCP コンソール画面のヘッダの色をプロジェクト毎に変更できる Chrome 拡張を作りました。
GCP console colorize

img1

条件は複数書けるので、プロジェクト毎に細かく設定できます。
(正規表現も使えます)

Chrome Web Store からインストールできるので是非導入してみてください。

GAE Datastore の Single-property Index と Composite Index は全く違うものと理解する

Datastore の Single-property Index と Composite Index はどちらも似たようなものだと思っていたのですが、実際のところはかなり違う性質をそれぞれ持っていることが段々とわかってきたので、現時点の自分の理解をメモしておきます。 恐らく Cloud Datastore にも該当する話だと思います。

Datastore のインデックスの種類

Datastore には2種類のインデックスが存在します。

  • Single-property Index (Built-in Index とも呼ばれる)
  • Composite Index (Custom Index とも呼ばれる)

このうち Single-property Index はデフォルトで全てのプロパティに有効になっているインデックスであり、Entity を保存した段階でそれぞれのプロパティに対応するインデックスが自動で作られます。 Composite Index はあらかじめ設定ファイルで定義しておく必要があり、それによって複数のプロパティを組み合わせた複合インデックスを作ることが出来ます。

両インデックスの違い

一番大きな違いだと思うのが、Single-property Index が Entity 単位にインデックスの ON/OFF をコントロールできる一方、Composite Index は Datastore の Kind 毎に ON/OFF されるという点です。 この性質の違いがいくつかの挙動の違いを生みます。

1. Single-property Index を途中からつけても、既存の Entity には効果がない

これはドキュメントに書かれてる内容ですが、今まで明示的に Unindex としていたプロパティに途中からインデックスを定義しようとしても、既存の Entity に対応するインデックスは作成されません。 そのため、古い Entity は MapReduce などを用いて fetch → put と再保存してインデックスを構築し直す必要があります。 これは Entity 単位にインデックスの ON/OFF が制御されるという性質上しょうがないものかと思います。

逆に Composite Index は All or Nothing なので、途中から定義したとしても既存を含む全ての Entity に作られることになります。

2. Single-property Index がないと Composite Index にエントリが追加されない

これは(自分の知る限り)アンドキュメントな内容ですが、Composite Index を設定ファイルで定義したとしても、そこで使用しているプロパティの Single-property Index が一つでも存在しない場合、Composite Index にはエントリが追加されません。 実際にその複合インデックスを使うようなクエリを投げても、単純にクエリに引っかからないような挙動になり、エラーもでないのでかなりハマります。 なぜこのような挙動になっているのか原理や理由はわかっていませんが、Composite Index は全ての Entity が対象となるので、一部の Entity をインデックスに追加しないようにするためにそういう機構があるのでしょうか。

Single-property Index は必ず定義しておくべき

上記のようなことがあるため、特別な理由がない限り Single-property Index は定義しておくべきだと考えています。 以前はインデックスの作成操作も Datastore write ops に含まれてしまっていたので必要のないインデックスはなるべく避ける傾向にあったと思いますが、今はそれらが Entity 単位の Quota になったため、あまり気にしなくていい感じになりました。 Single-property Index が既に存在していると、あとから Composite Index も張りやすくなりますし、なにより Datastore のコンソール画面からクエリを投げられるのでデバッグがやりやすくなると思います。

とはいってもインデックスの作成はその分ストレージを消費しますし、インデックスに追加されるまでの時間もインデックス数に応じて増えていくと思うので、他のインデックス追加に影響を及ぼして参照整合性が弱くなってしまうのを避けたい、といった場合には控えたほうが良さそうです。 (cf. Anti Pattern #2: Too Many Indexes - Balancing Strong and Eventual Consistency with Google Cloud Datastore)

一点実際のプログラミング時に注意が必要なのは、GAE/Java + Objectify で開発していると明示的に @Index アノテーションをプロパティに付加しておかないと setUnindexedProperty でプロパティが定義されてしまい、インデックス=OFFとなってしまう点です。 これが例えば GAE/Python + ndb だとプロパティを定義するとデフォルトでインデックス=ONとなるので、個人的には Objectify もデフォルトはインデックス=ONの挙動にしておいて欲しかった感じはあります...。

まとめ

Datastore の Single-property Index と Composite Index の性質をまとめると以下のようになります。

制御単位 インデックスの構築 注意点
Single-property Index Entity 単位 半自動 古い Entity は手動でインデックスを作る
Composite Index Kind 単位 自動 Single-property Index が作られていないとダメ

手探りで色々試している面もあるので、理解が間違っている等ありましたらご指摘下さい。

参考

GAE Task Queue をマイクロサービスのサービス間通信として使う

昨今マイクロサービスアーキテクチャに基づいたアプリケーション構築が話題ですが、Google App Engine を用いてる場合 Task Queue をサービス間の通信として便利に使用することが出来ます。

GAE の Task Queue (Push) について

背景

マイクロサービスでよくやることとして、あるサービスでイベントが発生したらそれを別のサービスに非同期に通知したいことがあります。 通常のアプリケーションの場合、何かしらの Message Queue を用いて Job Worker 経由で別のサービスに通知することが多いと思います。

+-----------+
| Service A |
+-----------+
     ↓ Push
+-----------+
|    MQ     |
+-----------+
     ↑ Pull
+-----------+
| Job Worker|
+-----------+
     ↓ HTTP
+-----------+
| Service B |
+-----------+

この場合 Job Worker は確実に Service B に通知を行なうよう、リトライ処理などをきちんと実装する必要があります。

一方 Google App Engine でアプリケーションを構築してる場合、代わりに Push 型の Task Queue を使うことで実現できます。

+-----------+
| Service A |
+-----------+
     ↓ Push
+-----------+
| Task Queue|
+-----------+
     ↓ HTTP
+-----------+
| Service B |
+-----------+

Task Queue はタスクの通知を HTTP で行なうため、サービス間に Job Worker を挟まずダイレクトに別のサービスに通信できる、という算段です。

Task Queue のコード例

以下のコードは Java で別のサービスに通信する例です。 アプリケーション特有のヘッダや、HTTP Body などを割と自由に付与できます。

// キューの取得
Queue queue = QueueFactory.getDefaultQueue();

// タスクの定義
TaskOptions task = TaskOptions.Builder
    .withMethod(TaskOptions.Method.POST)
    .url("/service_b/evetns")
    .header("X-My-Header", "foo, hoge")
    .payload("{\"eventId\": 123}".getBytes(), "application/json");

// タスクのキューイング
queue.add(task);

Task Queue だと何が嬉しいか

Job Worker となる部分を用意しなくて済むのも大きなメリットですが、他にも以下の様なメリットがあります。

  • Task Queue は App Engine のフルマネージドな部分なので、上述のリトライ処理を適切に行なってくれます。
    • タスクの種類ごとにリトライの間隔なども調整可能です。
  • 通常の MQ を用いた場合の欠点として、データベースのトランザクションの中に MQ へのキューイングを含められませんが、App Engine の場合 Datastore のトランザクションの中に Task Queue へのキューイングを含めることができます。
    • サービス自体の処理は成功したけど、別のサービスへの通知は失敗していた、などの中途半端な状態を防げれます。

注意点

Task Queue にもいくつか制約があります。

  • あくまでも非同期通信です。同期的なサービス間通信をしたい場合は、通常通りサービス内で HTTP 通信をします。
  • 使用できる HTTP メソッドは GET/POST/PUT/DELETE のみです。PATCH など別のメソッドを使いたい場合は X-HTTP-Method-Override などの Method Override を併用することになると思います。
  • Task Queue からタスクを受け取ったサービスは 200 番台のレスポンスを返さないとリトライされてしまいます。仮に 300/400 番台のレスポンスで成功を表すものがある場合は注意が必要です。
  • Task Queue で指定できる URL は、同じ App Engine アプリケーションで動いているサービスに対してのみです。App Engine 外に別のサービスを置いている場合は、一度 App Engine 内でタスクを受け取ったあとに行う必要があります。

まとめ

Task Queue を用いたサービス間通信のメリットについて書きました。 何より Task Queue によるタスクの受け渡しが HTTP で通信されるというのが大きな特徴ではないでしょうか。 マイクロサービスではサービス間通信を HTTP で行なうことが多いので、今回挙げた例のように「リトライ付きの非同期 HTTP クライアント」として Task Queue を使えるというのは非常に魅力的に思えます。