基本チュートリアル
JavaにおけるgRPCの基本的なチュートリアルの紹介です。
基本チュートリアル
このチュートリアルでは、gRPCを扱うための基本的なJavaプログラマー向けの入門を紹介します。
この例を順に進めることで、以下の方法を学びます。
.proto
ファイルでサービスを定義する。- プロトコルバッファコンパイラを使用してサーバーとクライアントのコードを生成する。
- Java gRPC APIを使用して、サービスの簡単なクライアントとサーバーを作成する。
gRPC入門を読み、プロトコルバッファに精通していることを前提としています。このチュートリアルの例では、プロトコルバッファ言語のproto3バージョンを使用していることに注意してください。詳細については、proto3言語ガイドとJava生成コードガイドを参照してください。
なぜgRPCを使うのか?
この例は、クライアントがルート上の機能に関する情報を取得したり、ルートの概要を作成したり、トラフィックの更新などのルート情報をサーバーや他のクライアントと交換したりできる簡単なルートマッピングアプリケーションです。
gRPCを使用すると、.proto
ファイルでサービスを一度定義し、gRPCでサポートされている任意の言語でクライアントとサーバーを生成できます。これらのクライアントとサーバーは、大規模なデータセンター内のサーバーから自分のタブレットまで、さまざまな環境で実行できます。異なる言語と環境間の通信の複雑さはすべてgRPCによって処理されます。また、効率的なシリアル化、シンプルなIDL、簡単なインターフェース更新など、プロトコルバッファを使用するすべての利点も得られます。
サンプルコードとセットアップ
このチュートリアルのサンプルコードは、grpc/grpc-java/examples/src/main/java/io/grpc/examples/routeguideにあります。サンプルをダウンロードするには、次のコマンドを実行してgrpc-java
リポジトリの最新リリースをクローンしてください。
$ git clone -b v1.63.0 --depth 1 https://github.com/grpc/grpc-java
次に、現在のディレクトリをgrpc-java/examples
に変更します。
$ cd grpc-java/examples
サービスを定義する
最初のステップ(gRPC入門からご存知のとおり)は、プロトコルバッファを使用してgRPCのサービスとメソッドのリクエストおよびレスポンスのタイプを定義することです。完全な.proto
ファイルはgrpc-java/examples/src/main/proto/route_guide.protoで確認できます。
この例ではJavaコードを生成するため、.proto
にjava_package
ファイルオプションを指定しました。
option java_package = "io.grpc.examples.routeguide";
これは、生成されたJavaクラスに使用するパッケージを指定します。.proto
ファイルに明示的なjava_package
オプションが指定されていない場合、デフォルトでは、protoパッケージ(「package」キーワードを使用して指定)が使用されます。ただし、protoパッケージは、一般に、リバースドメイン名で始まることが想定されていないため、優れたJavaパッケージにはなりません。この.proto
から別の言語でコードを生成する場合、java_package
オプションは効果がありません。
サービスを定義するには、.proto
ファイルで名前付きのservice
を指定します。
service RouteGuide {
...
}
次に、サービス定義内でrpc
メソッドを定義し、それらのリクエストとレスポンスのタイプを指定します。gRPCを使用すると、4種類のサービスメソッドを定義できます。これらはすべてRouteGuide
サービスで使用されています。
シンプルなRPC:クライアントがスタブを使用してサーバーにリクエストを送信し、通常の関数呼び出しのようにレスポンスが返ってくるのを待ちます。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
サーバー側のストリーミングRPC:クライアントがサーバーにリクエストを送信し、メッセージのシーケンスを読み取るためのストリームを取得します。クライアントは、メッセージがなくなるまで返されたストリームから読み取ります。例でわかるように、レスポンスタイプの前に
stream
キーワードを配置することで、サーバー側のストリーミングメソッドを指定します。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
クライアント側のストリーミングRPC:クライアントがメッセージのシーケンスを書き込み、提供されたストリームを使用してサーバーに送信します。クライアントがメッセージの書き込みを完了すると、サーバーがすべて読み取ってレスポンスを返すのを待ちます。リクエストタイプの前に
stream
キーワードを配置することで、クライアント側のストリーミングメソッドを指定します。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
双方向ストリーミングRPC:両側が読み取り/書き込みストリームを使用してメッセージのシーケンスを送信します。2つのストリームは独立して動作するため、クライアントとサーバーは、任意の順序で読み取りと書き込みを行うことができます。たとえば、サーバーは、クライアントからのすべてのメッセージを受信してからレスポンスを書き込むのを待つことも、メッセージを読み取ってメッセージを書き込むことを交互に行うことも、その他の読み取りと書き込みの組み合わせを行うこともできます。各ストリームのメッセージの順序は保持されます。リクエストとレスポンスの両方の前に
stream
キーワードを配置することで、このタイプのメソッドを指定します。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
.proto
ファイルには、サービスメソッドで使用されるすべてのリクエストとレスポンスのタイプのプロトコルバッファメッセージタイプの定義も含まれています。たとえば、Point
メッセージタイプは次のとおりです。
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
クライアントとサーバーのコードを生成する
次に、.proto
サービス定義からgRPCクライアントとサーバーのインターフェースを生成する必要があります。これは、特別なgRPC Javaプラグインを備えたプロトコルバッファコンパイラprotoc
を使用して行います。gRPCサービスを生成するには、proto3コンパイラ(proto2とproto3の両方の構文をサポート)を使用する必要があります。
GradleまたはMavenを使用する場合、protocビルドプラグインは、ビルドの一部として必要なコードを生成できます。独自の.proto
ファイルからコードを生成する方法については、grpc-java READMEを参照してください。
サービス定義から次のクラスが生成されます。
Feature.java
、Point.java
、Rectangle.java
など。これらには、リクエストとレスポンスのメッセージタイプを設定、シリアル化、および取得するためのすべてのプロトコルバッファコードが含まれています。RouteGuideGrpc.java
には、(他の便利なコードとともに)次のものが含まれています。RouteGuide
サーバーが実装するための基本クラスであるRouteGuideGrpc.RouteGuideImplBase
。RouteGuide
サービスで定義されているすべてのメソッドが含まれています。- クライアントが
RouteGuide
サーバーと通信するために使用できるスタブクラス。
サーバーを作成する
最初に、RouteGuide
サーバーを作成する方法を見てみましょう。gRPCクライアントの作成のみに関心がある場合は、このセクションをスキップして、クライアントの作成に直接進むことができます(それでも興味深いと思われるかもしれません!)。
RouteGuide
サービスを機能させるには、2つの部分があります。
- サービス定義から生成されたサービス基本クラスをオーバーライドする:サービスの実際の「作業」を行う。
- クライアントからのリクエストをリッスンし、サービスレスポンスを返すgRPCサーバーを実行する。
サンプルのRouteGuide
サーバーは、grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.javaにあります。どのように動作するかを詳しく見てみましょう。
RouteGuideを実装する
ご覧のとおり、サーバーには、生成されたRouteGuideGrpc.RouteGuideImplBase
抽象クラスを拡張するRouteGuideService
クラスがあります。
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
シンプルなRPC
RouteGuideService
は、すべてのサービスメソッドを実装します。まず、最も単純なメソッドであるGetFeature()
を見てみましょう。これは、クライアントからPoint
を取得し、データベースから対応する機能情報をFeature
で返します。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
getFeature()
メソッドは、2つのパラメータを受け取ります。
Point
:リクエストStreamObserver<Feature>
:レスポンスオブザーバー。これは、サーバーがレスポンスを呼び出すための特別なインターフェースです。
クライアントにレスポンスを返して呼び出しを完了するには、次のようにします。
- サービス定義で指定されているとおりに、クライアントに返す
Feature
レスポンスオブジェクトを構築して設定します。この例では、これを別のプライベートcheckFeature()
メソッドで行います。 - レスポンスオブザーバーの
onNext()
メソッドを使用して、Feature
を返します。 - レスポンスオブザーバーの
onCompleted()
メソッドを使用して、RPCの処理が完了したことを指定します。
サーバー側のストリーミングRPC
次に、ストリーミングRPCの1つを見てみましょう。ListFeatures
はサーバー側のストリーミングRPCであるため、複数のFeature
をクライアントに送り返す必要があります。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
シンプルなRPCと同様に、このメソッドは、リクエストオブジェクト(クライアントがFeature
を検索するRectangle
)とStreamObserver
レスポンスオブザーバーを取得します。
今回は、クライアントに返すのに必要な数のFeature
オブジェクトを取得し(この例では、リクエストのRectangle
内にあるかどうかを基準に、サービスのフィーチャーコレクションから選択します)、それぞれをレスポンスオブザーバーのonNext()
メソッドを使ってレスポンスとして書き込みます。最後に、単純なRPCと同様に、レスポンスオブザーバーのonCompleted()
メソッドを使って、レスポンスの書き込みが完了したことをgRPCに伝えます。
クライアント側のストリーミングRPC
次に、少し複雑なものを見てみましょう。クライアント側のストリーミングメソッドであるRecordRoute()
です。ここでは、クライアントからPoint
のストリームを受け取り、そのトリップに関する情報を含む単一のRouteSummary
を返します。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
ご覧のとおり、以前のメソッドタイプと同様に、このメソッドもStreamObserver
のレスポンスオブザーバーパラメータを受け取りますが、今回はクライアントがPoint
を書き込むためのStreamObserver
を返します。
メソッド本体では、返信する匿名のStreamObserver
をインスタンス化します。この中で、
- クライアントがメッセージストリームに
Point
を書き込むたびに、フィーチャーやその他の情報を取得するために、onNext()
メソッドをオーバーライドします。 onCompleted()
メソッド(クライアントがメッセージの書き込みを完了したときに呼び出されます)をオーバーライドして、RouteSummary
を作成および構築します。次に、メソッド自身のレスポンスオブザーバーのonNext()
をRouteSummary
で呼び出し、さらにonCompleted()
メソッドを呼び出して、サーバー側からの呼び出しを終了します。
双方向ストリーミングRPC
最後に、双方向ストリーミングRPCであるRouteChat()
を見てみましょう。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
クライアント側のストリーミングの例と同様に、StreamObserver
のレスポンスオブザーバーを取得して返しますが、今回はクライアントが自分のメッセージストリームにメッセージを書き込んでいる間に、メソッドのレスポンスオブザーバーを介して値を返します。ここでの読み書きの構文は、クライアントストリーミングメソッドおよびサーバーサイドストリーミングメソッドとまったく同じです。各サイドは常に相手のメッセージを書かれた順序で取得しますが、クライアントとサーバーの両方が任意の順序で読み書きできます。ストリームは完全に独立して動作します。
サーバーを起動する
すべてのメソッドを実装したら、クライアントが実際にサービスを利用できるように、gRPCサーバーを起動する必要もあります。次のスニペットは、RouteGuide
サービスでこれを行う方法を示しています。
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
...
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
...
}
ご覧のとおり、ServerBuilder
を使用してサーバーを構築および起動します。
これを行うには、
- ビルダーの
forPort()
メソッドを使用して、クライアントリクエストをリッスンするために使用するアドレスとポートを指定します。 - サービス実装クラス
RouteGuideService
のインスタンスを作成し、それをビルダーのaddService()
メソッドに渡します。 - ビルダーで
build()
とstart()
を呼び出して、サービスのRPCサーバーを作成および起動します。
クライアントを作成する
このセクションでは、RouteGuide
サービスのクライアントを作成する方法を見ていきます。完全なクライアントコードの例は、grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.javaにあります。
スタブをインスタンス化する
サービスメソッドを呼び出すには、まず*スタブ*を作成する必要があります。正確には、2つのスタブが必要です。
- *ブロッキング/同期*スタブ:これは、RPC呼び出しがサーバーからの応答を待機し、応答を返すか、例外を発生させることを意味します。
- サーバーへのノンブロッキング呼び出しを行う*ノンブロッキング/非同期*スタブ。応答は非同期で返されます。特定のタイプのストリーミング呼び出しは、非同期スタブでのみ行うことができます。
まず、接続先のサーバーアドレスとポートを指定して、スタブのgRPC*チャネル*を作成する必要があります。
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
ManagedChannelBuilder
を使用してチャネルを作成します。
これで、チャネルを使用して、.proto
から生成したRouteGuideGrpc
クラスで提供されるnewStub
メソッドとnewBlockingStub
メソッドを使用してスタブを作成できます。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
サービスメソッドを呼び出す
次に、サービスメソッドの呼び出し方法を見てみましょう。
シンプルなRPC
ブロッキングスタブで単純なRPC GetFeature
を呼び出すのは、ローカルメソッドを呼び出すのと同じくらい簡単です。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
リクエストプロトコルバッファオブジェクト(この場合はPoint
)を作成して入力し、それをブロッキングスタブのgetFeature()
メソッドに渡すと、Feature
が返されます。
エラーが発生した場合は、Status
としてエンコードされます。これはStatusRuntimeException
から取得できます。
サーバー側のストリーミングRPC
次に、地理的なFeature
のストリームを返すサーバーサイドストリーミング呼び出しであるListFeatures
を見てみましょう。
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
ご覧のとおり、直前に見た単純なRPCと非常によく似ていますが、単一のFeature
を返す代わりに、メソッドはクライアントが返されたすべてのFeature
を読み取るために使用できるIterator
を返します。
クライアント側のストリーミングRPC
次に、少し複雑なものを見てみましょう。クライアントサイドストリーミングメソッドであるRecordRoute
です。ここでは、Point
のストリームをサーバーに送信し、単一のRouteSummary
を返します。このメソッドでは、非同期スタブを使用する必要があります。サーバーの作成をすでに読んでいる場合、これの一部は非常によく似ているように見えるかもしれません。非同期ストリーミングRPCは、両側で同様の方法で実装されます。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
ご覧のとおり、このメソッドを呼び出すにはStreamObserver
を作成する必要があります。これは、サーバーがRouteSummary
レスポンスで呼び出すための特別なインターフェースを実装します。StreamObserver
では、
- サーバーがメッセージストリームに
RouteSummary
を書き込むときに、返された情報を出力するために、onNext()
メソッドをオーバーライドします。 onCompleted()
メソッド(*サーバー*側での呼び出しが完了したときに呼び出されます)をオーバーライドして、サーバーが書き込みを完了したかどうかを確認できるCountDownLatch
を減らします。
次に、StreamObserver
を非同期スタブのrecordRoute()
メソッドに渡すと、サーバーに送信するPoint
を書き込むための独自のStreamObserver
リクエストオブザーバーが返されます。ポイントの書き込みが完了したら、リクエストオブザーバーのonCompleted()
メソッドを使用して、クライアント側での書き込みが完了したことをgRPCに伝えます。完了したら、CountDownLatch
をチェックして、サーバー側で完了したことを確認します。
双方向ストリーミングRPC
最後に、双方向ストリーミングRPCであるRouteChat()
を見てみましょう。
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
クライアントサイドストリーミングの例と同様に、StreamObserver
のレスポンスオブザーバーを取得して返しますが、今回はサーバーが自分のメッセージストリームにメッセージを書き込んでいる間に、メソッドのレスポンスオブザーバーを介して値を送信します。ここでの読み書きの構文は、クライアントストリーミングメソッドとまったく同じです。各サイドは常に相手のメッセージを書かれた順序で取得しますが、クライアントとサーバーの両方が任意の順序で読み書きできます。ストリームは完全に独立して動作します。
試してみよう!
exampleディレクトリのREADMEの指示に従って、クライアントとサーバーをビルドして実行します。