gRPCの最適化について - パート1
gRPCでよくある質問は、どうすれば高速化できるかということです。gRPCライブラリは、ユーザーに高性能なRPCへのアクセスを提供しますが、それをどのように達成するかは常に明確ではありません。この質問は十分に一般的であるため、プログラムのチューニング時の思考プロセスを共有しようと思いました。
セットアップ
複数のプログラムで使用される基本的なキー・バリュー・サービスを考えてみましょう。サービスは、複数の更新が同時に発生する場合に、同時アクセスに対して安全である必要があります。利用可能なハードウェアを使用するようにスケーリングできる必要があります。最後に、高速である必要があります。gRPCは、この種のサービスに最適です。実装の最良の方法を見てみましょう。
このブログ記事のために、私はgRPC Javaを使用して、クライアントとサーバーの例を作成しました。プログラムは3つの主要なクラスとAPIを記述するprotobufファイルに分割されています。
- KvClientは、キー・バリュー・システムをシミュレートしたユーザーです。ランダムにキーと値を作成、取得、更新、削除します。使用するキーと値のサイズも、指数分布を使用してランダムに決定されます。
- KvServiceは、キー・バリュー・サービスの1つです。gRPCサーバーによってインストールされ、クライアントによって発行されたリクエストを処理します。キーと値をディスクに保存することをシミュレートするために、リクエストを処理中に短いスリープを追加します。読み取りと書き込みは、例が永続データベースのように動作するように、10ミリ秒と50ミリ秒の遅延を経験します。
- KvRunnerは、クライアントとサーバー間の相互作用を調整します。これはメインのエントリポイントであり、クライアントとサーバーの両方をプロセス内で起動し、クライアントが作業を実行するのを待ちます。ランナーは60秒間作業を実行してから、完了したRPCの数を記録します。
- kvstore.protoは、サービスのプロトコルバッファ定義です。クライアントがサービスから何を期待できるかを正確に記述します。簡単にするために、操作(一般にCRUDとして知られています)として作成、取得、更新、削除を使用します。これらの操作は、任意のバイトで構成されるキーと値で機能します。それらはある程度RESTに似ていますが、将来的にさらに複雑な操作を追加する権利を留保します。
Protocol buffers(protos)はgRPCを使用するために必須ではありませんが、サービスインターフェースを定義し、クライアントとサーバーのコードを生成するための非常に便利な方法です。生成されたコードは、アプリケーションロジックとコアgRPCライブラリ間のグルーコードとして機能します。gRPCクライアントによって呼び出されるコードをスタブと呼びます。
出発点
クライアント
プログラムが何をすべきかを知ったので、プログラムがどのようにパフォーマンスを発揮するかを見てみましょう。前述のように、クライアントはランダムなRPCを作成します。たとえば、ここでは作成リクエストを行うコードです。
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
ランダムなキーとランダムな値が作成されます。リクエストはサーバーに送信され、クライアントは応答を待ちます。応答が返されたら、コードが予想通りであるかを確認し、そうでない場合は例外をスローします。キーはランダムに選択されますが、一意である必要があります。そのため、同じキーがすでに使用されていないことを確認する必要があります。これを処理するために、コードは作成したキーを追跡し、同じキーを2度作成しないようにします。ただし、別のクライアントが特定のキーを作成している可能性があるため、それをログに記録して続行します。それ以外の場合は、例外がスローされます。
ここではブロッキングgRPC APIを使用しており、リクエストを発行して応答を待ちます。これは最も単純なgRPCスタブですが、実行中にスレッドをブロックします。これは、クライアントの観点からは、一度に最大1つのRPCしか進行できないことを意味します。
サーバー
サーバー側では、リクエストはサービスハンドラによって受信されます。
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
サービスは、リクエストからキーと値をByteBufferとして抽出します。同時リクエストがストレージを破損しないように、サービス自体にロックを取得します。書き込みのディスクアクセスをシミュレートした後、キーから値へのMapに保存します。
クライアントコードとは異なり、サービスハンドラは非ブロッキングです。つまり、関数呼び出しのように値を返しません。代わりに、responseObserverのonNext()を呼び出して、クライアントに応答を送信します。この呼び出しも非ブロッキングであることに注意してください。つまり、メッセージがまだ送信されていない可能性があります。メッセージの処理が完了したことを示すために、onCompleted()が呼び出されます。
パフォーマンス
コードは安全で正しいので、パフォーマンスを見てみましょう。測定には、12コアプロセッサと32GBのメモリを搭載したUbuntuシステムを使用しています。コードをビルドして実行してみましょう。
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
しまった!このような強力なマシンで、1秒あたり約16 RPCしか実行できません。CPUをほとんど使用しておらず、メモリ使用量もわかりません。なぜそれほど遅いのかを突き止める必要があります。
最適化
分析
変更を加える前に、プログラムが何をしているのかを理解しましょう。最適化する際には、どこでコードが時間を費やしているのかを知る必要があります。この初期段階では、まだプロファイリングツールは必要ありません。プログラムについて推論するだけで十分です。
クライアントが起動され、約1分間RPCをシリアルに発行します。各イテレーションで、ランダムに決定します。どの操作を行うかを。
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
これは、最大1つのRPCしか同時にアクティブにできないことを意味します。各RPCは、前のRPCが完了するのを待つ必要があります。そして、各RPCは完了するのにどれくらいの時間がかかりますか?サーバーコードを読むと、ほとんどの操作は書き込みを行っており、約50ミリ秒かかります。最高の効率で、このコードが1秒あたりに実行できる操作の最大数は約20です。
20クエリ = 1000ms / (50 ms / クエリ)
私たちのコードは1秒あたり約16クエリを実行できます。これは妥当なようです。timeコマンドの出力を見て、この仮定をスポットチェックできます。サーバーは、クエリの実行中にsimulateWorkメソッドでスリープします。これは、プログラムがRPCの完了を待っている間、ほとんどアイドル状態になるはずであることを示唆しています。
上記のコマンドのrealとuser時間を見て、これが事実であることを確認できます。それらは、壁時計時間の量は1分でしたが、CPU時間の量は10秒でした。私の強力なマルチコアCPUは、時間の16%しか使用されていませんでした。したがって、この時間中にプログラムにもっと多くの作業を実行させることができれば、より多くのRPCを完了させることができるようです。
仮説
これで、問題が何であるかを明確に述べ、解決策を提案できます。プログラムを高速化する1つの方法は、CPUがアイドル状態にならないようにすることです。これを行うには、作業を並行して発行します。
gRPC Javaには、スタブが3種類あります。ブロッキング、非ブロッキング、リスナブル・フューチャーです。クライアントではブロッキングスタブ、サーバーでは非ブロッキングスタブはすでに見てきました。リスナブル・フューチャーAPIは、ブロッキングと非ブロッキングの両方の動作を提供する、両者の中間です。作業が完了するのを待ってスレッドをブロックしない限り、古いRPCが完了するのを待たずに新しいRPCを開始できます。
実験
仮説をテストするために、クライアントコードをリスナブル・フューチャーAPIを使用するように変更しましょう。これは、コードでの並行性についてさらに考える必要があることを意味します。たとえば、クライアント側で既知のキーを追跡する場合、安全にキーを読み取り、変更し、書き込む必要があります。また、エラーが発生した場合に、新しいRPCの作成を停止する必要があることも確認する必要があります(適切なエラー処理は将来の投稿で扱います)。最後に、同時実行されたRPCの数を更新する必要があります。これは、更新が別のスレッドで発生する可能性があるためです。
これらの変更をすべて行うと、コードの複雑さが増します。これは、コードの最適化を検討する際に考慮する必要があるトレードオフです。一般的に、コードの単純さは最適化とは対立します。Javaは、短く書けることで知られているわけではありません。とはいえ、以下のコードはまだ読みやすく、関数のプログラムフローは依然として大まかに上から下への流れです。これはdoCreate()メソッドを改訂したものです。
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
スタブはKeyValueServiceFutureStubに変更され、呼び出されたときにレスポンス自体ではなくFutureを返します。gRPC Javaは、これをListenableFutureという拡張機能を使用しており、Futureが完了したときにコールバックを追加できます。このプログラムでは、応答を取得することにはあまり関心がありません。むしろ、RPCが成功したかどうかをより気にかけています。それを念頭に置いて、コードは主に応答を処理するのではなく、エラーをチェックします。
最初に行われた変更は、RPCの数を記録する方法です。メインループの外でカウンターをインクリメントする代わりに、RPCが完了したときにインクリメントします。
次に、各RPCに対して新しいオブジェクトを作成し、成功と失敗の両方のケースを処理します。doCreate()はRPCコールバックが呼び出されるまでに完了しているため、例外をスローする以外の方法でエラーを伝播させる必要があります。代わりに、参照をアトミックに更新しようとします。メインループは、エラーが発生したかどうかを時々チェックし、問題があれば停止します。
最後に、コードはRPCが実際に完了したときにのみキーをknownKeysに追加し、失敗したことがわかったときにのみ削除するように注意しています。2つのスレッドが競合しないように、変数に同期します。注:knownKeysへのアクセスはスレッドセーフですが、それでも競合状態が存在します。あるスレッドがknownKeysから読み取り、2番目のスレッドがknownKeysから削除し、その後最初のスレッドが最初のキーを使用したRPCを発行することが可能です。キーに同期することは、それが一貫していることを保証するだけで、それが正しいことを保証するわけではありません。これを適切に修正することは、この投稿の範囲外であるため、イベントをログに記録して続行するだけです。このプログラムを実行すると、いくつかのログステートメントが表示されます。
コードの実行
このプログラムを起動して実行すると、機能しないことに気付くでしょう。
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
何だって?なぜ失敗するコードを見せるのか?その理由は、現実の世界では、変更が一度でうまくいくことはめったにないからです。この場合、プログラムはメモリを使い果たしました。プログラムがメモリを使い果たしたときに奇妙なことが起こり始めます。多くの場合、根本原因を見つけるのは難しく、赤字のヘリング(偽の手がかり)がたくさんあります。混乱を招くエラーメッセージは、「新しいネイティブスレッドを作成できません」と言いますが、コードで新しいスレッドを作成したわけではありません。これらの問題をデバッグするよりも、経験が非常に役立ちます。OOM(Out Of Memory)を数多くデバッグしてきたので、Javaが最後の決め手について私たちに伝えてくれることを知っています。私たちのプログラムははるかに多くのメモリを使い始めましたが、失敗した最後の割り当ては、偶然にもスレッド作成にありました。
何が起こったのか?新しいRPCの開始に対するプッシュバックはありませんでした。ブロッキングバージョンでは、最後のRPCが完了するまで新しいRPCを開始できませんでした。遅かったですが、メモリがないにもかかわらず、大量のRPCを作成することも防いでいました。リスナブル・フューチャーバージョンでは、これを考慮する必要があります。
これを解決するために、アクティブなRPCの数に自己課金制限を適用できます。新しいRPCを開始する前に、パーミットを取得しようとします。取得できれば、RPCを開始できます。できない場合は、利用可能になるまで待ちます。RPCが完了したとき(成功または失敗)、パーミットを返します。これを達成するために、Semaphoreを使用します。
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
これでコードは正常に実行され、メモリを使い果たすことはありません。
結果
コードを再度ビルドして実行すると、はるかに見栄えが良くなります。
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
私たちのコードは、以前よりも46%多くのRPCを1秒あたりに実行します。また、以前よりもCPU使用率が約20%増加したこともわかります。ご覧のとおり、仮説は正しかったことが判明し、修正は機能しました。これらすべてが、サーバーを変更することなく行われました。また、特別なプロファイラーやトレーサーを使用せずに測定することができました。
数字は妥当でしょうか?ミューテーション(作成、更新、削除)RPCは、それぞれ約1/4の確率で発行されると予想されます。読み取りも1/4の確率で発行されますが、それほど時間はかかりません。平均RPC時間は、加重平均RPC時間であるべきです。
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
RPCあたりの平均40msで、1秒あたりのRPCの数は次のようになると予想されます。
25クエリ = 1000ms / (40 ms / クエリ)
これは、新しいコードで見られるものとほぼ一致します。サーバーはまだリクエストをシリアルに処理しているため、将来的にはさらに多くの作業が必要になるようです。しかし、今のところ、私たちの最適化はうまくいったようです。
結論
gRPCコードを最適化する機会はたくさんあります。これらを活用するには、コードが何をしているのか、そしてコードが何をすべきなのかを理解する必要があります。この記事は、最適化へのアプローチとそれについて考えるための非常に基本的な方法を示しています。変更の前後で必ず測定し、これらの測定値を使用して最適化をガイドしてください。
パート2では、サーバー部分のコードの最適化を続けます。