gRPCの最適化について - パート2
gRPCはどれくらい高速か?現代のクライアントとサーバーの構築方法を理解していれば、かなり高速です。パート1では、簡単な60%の改善方法を示しました。この投稿では、10000%の改善方法を示します。
セットアップ
パート1と同様に、既存のJavaベースのキー・バリュー・サービスから始めます。このサービスは、キーと値の作成、読み取り、更新、削除のための同時アクセスを提供します。コード全体はここで見ることができますので、試してみてください。
サーバーの同時実行性
KvServiceクラスを見てみましょう。このサービスは、クライアントから送信されたRPCを処理し、それらがストレージの状態を誤って破損させないようにします。これを確実にするために、サービスはsynchronizedキーワードを使用して、一度にアクティブなRPCは1つだけであることを保証します。
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
このコードはスレッドセーフですが、その代償は大きい:一度にアクティブなRPCは1つだけです!複数の操作を安全に同時に実行できるようにする必要があります。そうでなければ、プログラムは利用可能なすべてのプロセッサを活用できません。
ロックからの解放
これを解決するには、RPCのセマンティクスについてもう少し知る必要があります。RPCがどのように機能することを想定しているかを知るほど、より多くの最適化を行うことができます。キー・バリュー・サービスの場合、異なるキーへの操作は互いに干渉しないことに気づきます。キー「foo」を更新しても、「bar」キーに格納されている値には影響しません。しかし、私たちのサーバーは、どのキーへの操作も互いに同期する必要があるように書かれています。異なるキーへの操作を同時に実行できるようにすれば、サーバーははるかに多くの負荷を処理できるようになります。
アイデアを念頭に置いて、サーバーを変更する方法を検討する必要があります。synchronizedキーワードは、Javaがthis(KvServiceのインスタンス)のロックを取得するようにします。ロックはcreateメソッドに入るときに取得され、戻るときに解放されます。同期が必要なのはstore Mapを保護するためです。これはHashMapとして実装されているため、それに変更を加えると内部配列が変更されます。HashMapの内部状態は同期されないと破損する可能性があるため、メソッドの同期を削除することはできません。
しかし、JavaにはConcurrentHashMapという解決策があります。このクラスは、マップの内容に安全に同時にアクセスできるようにします。例えば、私たちの使用法では、キーが存在するかどうかを確認したいとします。存在しない場合は追加し、存在する場合はエラーを返したいとします。putIfAbsentメソッドは、値が存在するかどうかをアトミックにチェックし、存在しない場合は追加し、成功したかどうかを通知します。
Concurrent MapはputIfAbsentの安全性についてより強力な保証を提供するため、HashMapをConcurrentHashMapに交換し、synchronizedを削除できます。
private final ConcurrentMap<ByteBuffer, ByteBuffer> store = new ConcurrentHashMap<>();
@Override
public void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
一度うまくいかなくても
createの更新はかなり簡単でした。retrieveとdeleteについても同様です。しかし、updateメソッドは少しトリッキーです。それが何をしているか見てみましょう。
@Override
public synchronized void update(
UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer newValue = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
ByteBuffer oldValue = store.get(key);
if (oldValue == null) {
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
return;
}
store.replace(key, oldValue, newValue);
responseObserver.onNext(UpdateResponse.getDefaultInstance());
responseObserver.onCompleted();
}
キーを新しい値に更新するには、storeとの2回のやり取りが必要です。
- キーがそもそも存在するかどうかを確認します。
- 以前の値から新しい値に更新します。
残念ながらConcurrentMapには、これを直接行うための簡単なメソッドがありません。私たちがマップを変更する唯一の存在であるとは限らないため、私たちの仮定が変更された可能性を処理する必要があります。古い値を取得しますが、それを置き換える頃には削除されている可能性があります。
これを解決するために、replaceが失敗した場合は再試行します。これは、置換が成功した場合にtrueを返します。(ConcurrentMapは、操作が内部構造を破損しないことを保証しますが、成功すると言っているわけではありません!)do-whileループを使用します。
@Override
public void update(
UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
// ...
ByteBuffer oldValue;
do {
oldValue = store.get(key);
if (oldValue == null) {
responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
return;
}
} while (!store.replace(key, oldValue, newValue));
responseObserver.onNext(UpdateResponse.getDefaultInstance());
responseObserver.onCompleted();
}
コードは、nullを検出すると失敗したいですが、nullでない以前の値がある場合は決して失敗したくありません。注意すべき点は、別のRPCがstore.get()呼び出しとstore.replace()呼び出しの間で値を変更した場合、失敗するということです。これは私たちにとって致命的なエラーではないので、再試行するだけです。新しい値を正常に投入できたら、サービスはユーザーに応答を返すことができます。
もう1つの可能性としては、2つのRPCが同じ値を更新し、互いの作業を上書きしてしまうことが考えられます。これは一部のアプリケーションでは問題ないかもしれませんが、トランザクショナリティを提供するAPIには適しません。これを修正する方法を示すことはこの投稿の範囲外ですが、それが起こりうることを認識しておいてください。
パフォーマンスの測定
前回の投稿では、クライアントを非同期にし、gRPC ListenableFuture APIを使用するように変更しました。メモリ不足を避けるために、クライアントは一度に最大100個のRPCしかアクティブにならないように変更されました。サーバーコードからわかるように、パフォーマンスはロックの取得にボトルネックがありました。それらを削除したので、100倍の改善が見込まれます。RPCあたりの作業量は同じですが、はるかに多くのRPCが同時に実行されています。仮説が正しいか見てみましょう。
変更前
./gradlew installDist
time ./build/install/kvstore/bin/kvstore
Apr 16, 2018 10:38:42 AM io.grpc.examples.KvRunner runClient
INFO: Did 24.067 RPCs/s
real 1m0.886s
user 0m9.340s
sys 0m1.660s
変更後
Apr 16, 2018 10:36:48 AM io.grpc.examples.KvRunner runClient
INFO: Did 2,449.8 RPCs/s
real 1m0.968s
user 0m52.184s
sys 0m20.692s
すごい!1秒あたりのRPC数が24から2,400に増加しました。APIやクライアントを変更する必要もありませんでした。だからこそ、コードとAPIのセマンティクスを理解することが重要なのです。キー・バリュー・APIの特性、つまり異なるキーに対する操作の独立性を活用することで、コードははるかに高速になりました。
このコードの注目すべき成果の1つは、結果のuserタイミングです。以前はユーザー時間はわずか9秒でした。つまり、コードが実行されていた60秒のうち、CPUは9秒しかアクティブではありませんでした。その後、使用率は5倍以上に増加して52秒になりました。その理由は、より多くのCPUコアがアクティブになっているためです。KvServerは、数ミリ秒スリープすることで作業をシミュレートしています。実際のアプリケーションでは、有用な作業が行われるため、これほど劇的な変化はありません。RPCの数に比例してスケールするのではなく、コアの数に比例してスケールします。したがって、お使いのマシンに12コアがあれば、12倍の改善が期待できるでしょう。それでも悪くないですが!
さらなるエラー
このコードを自分で実行すると、以下のような形式で大量のログスパムが表示されます。
Apr 16, 2018 10:38:40 AM io.grpc.examples.KvClient$3 onFailure
INFO: Key not found
io.grpc.StatusRuntimeException: NOT_FOUND
その理由は、新しいバージョンのコードではAPIレベルの競合状態がより顕著になるためです。100倍のRPCが同時に実行されるため、更新と削除が互いに衝突する可能性が高くなります。これを解決するには、API定義を変更する必要があります。この修正方法については、次回の投稿をお楽しみに。
結論
gRPCコードを最適化する機会はたくさんあります。これらを活用するには、コードが何を行っているかを理解する必要があります。この投稿では、ロックベースのサービスを低競合のロックフリーサービスに変換する方法を示しました。変更の前後に必ず測定を行ってください。