基本チュートリアル
JavaにおけるgRPCの基本的なチュートリアル入門。
基本チュートリアル
このチュートリアルは、gRPCを扱うための基本的なJavaプログラマー向けの入門を提供します。
この例を段階的に実行することで、以下のことを学びます。
.protoファイルでサービスを定義する。- プロトコルバッファコンパイラを使用して、サーバーとクライアントのコードを生成する。
- Java gRPC APIを使用して、サービス用のシンプルなクライアントとサーバーを作成します。
「gRPCとは」を読み、protocol buffersに慣れていることを前提としています。このチュートリアルの例では、protocol buffers言語のproto3バージョンを使用しています。詳細はproto3言語ガイドおよびJava生成コードガイドをご覧ください。
gRPCを使用する理由
私たちの例は、クライアントがルート上の機能に関する情報を取得したり、ルートの要約を作成したり、サーバーや他のクライアントと交通情報などのルート情報を交換したりできる、シンプルなルートマッピングアプリケーションです。
gRPCを使用すると、1つの.protoファイルでサービスを一度定義し、gRPCがサポートする任意の言語でクライアントとサーバーを生成できます。これにより、大規模データセンター内のサーバーから個人のタブレットまで、さまざまな環境で実行できます。異なる言語や環境間の通信の複雑さはすべてgRPCによって処理されます。また、効率的なシリアライゼーション、シンプルなIDL、簡単なインターフェース更新など、プロトコルバッファを使用する利点もすべて得られます。
サンプルコードとセットアップ
チュートリアルのサンプルコードはgrpc/grpc-java/examples/src/main/java/io/grpc/examples/routeguideにあります。サンプルをダウンロードするには、以下のコマンドを実行してgrpc-javaリポジトリの最新リリースをクローンしてください。
git clone -b v1.73.0 --depth 1 https://github.com/grpc/grpc-java
次に、カレントディレクトリをgrpc-java/examplesに変更してください。
cd grpc-java/examples
サービスを定義する
最初のステップは(「gRPCとは」で既に読んでいる通り)、protocol buffersを使用してgRPCのサービスとメソッドのリクエストおよびレスポンスタイプを定義することです。完全な.protoファイルはgrpc-java/examples/src/main/proto/route_guide.protoで確認できます。
この例ではJavaコードを生成しているため、.protoファイルにjava_packageファイルオプションを指定しました。
option java_package = "io.grpc.examples.routeguide";
これは、生成されるJavaクラスに使用したいパッケージを指定します。.protoファイルで明示的なjava_packageオプションが指定されていない場合、デフォルトでプロトパケット(「package」キーワードで指定)が使用されます。ただし、プロトパケットは通常、Javaのパッケージとして適切ではありません。なぜなら、プロトパケットはリバースドメイン名で始まることが期待されていないからです。この.protoファイルから他の言語でコードを生成する場合、java_packageオプションは影響しません。
サービスを定義するために、.protoファイルに名前付きのserviceを指定します。
service RouteGuide {
...
}
次に、サービス定義内でrpcメソッドを定義し、リクエストとレスポンスのタイプを指定します。gRPCは4種類のサービスメソッドを定義できます。これらはすべてRouteGuideサービスで使用されます。
クライアントがスタブを使用してサーバーにリクエストを送信し、通常の関数呼び出しのようにレスポンスが返ってくるのを待つ*シンプルなRPC*。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}クライアントがサーバーにリクエストを送信し、メッセージのシーケンスを読み取るためのストリームを取得する*サーバーサイドストリーミングRPC*。クライアントは、メッセージがなくなるまで返されたストリームから読み取ります。例でわかるように、*レスポンス*の型の前に
streamキーワードを配置することで、サーバーサイドストリーミングメソッドを指定します。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}クライアントがメッセージのシーケンスを書き込み、提供されたストリームを使用してサーバーに送信する*クライアントサイドストリーミングRPC*。クライアントがメッセージの書き込みを終了したら、サーバーがすべてを読み取ってレスポンスを返すのを待ちます。*リクエスト*の型の前に
streamキーワードを配置することで、クライアントサイドストリーミングメソッドを指定します。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}両側が読み書き可能なストリームを使用してメッセージのシーケンスを送信する*双方向ストリーミングRPC*。2つのストリームは独立して動作するため、クライアントとサーバーは任意の順序で読み書きできます。たとえば、サーバーはクライアントメッセージをすべて受信してからレスポンスを書き込むのを待つことも、メッセージを読み取ってからメッセージを書き込むことも、またはその他の読み書きの組み合わせを行うこともできます。各ストリーム内のメッセージの順序は保持されます。リクエストとレスポンスの両方の前に
streamキーワードを配置することで、このタイプのメソッドを指定します。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
私たちの.protoファイルには、サービスメソッドで使用されるすべてリクエストとレスポンスの型に対応するプロトコルバッファメッセージ型定義も含まれています。たとえば、ここにPointメッセージ型があります。
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
クライアントとサーバーのコードを生成する
次に、.protoサービス定義からgRPCクライアントおよびサーバーインターフェースを生成する必要があります。これは、特別なgRPC Javaプラグインを使用してプロトコルバッファコンパイラprotocを使用して行います。gRPCサービスを生成するには、proto3コンパイラ(proto2およびproto3構文の両方をサポート)を使用する必要があります。
GradleまたはMavenを使用する場合、protocビルドプラグインはビルドの一部として必要なコードを生成できます。独自の.protoファイルからコードを生成する方法については、grpc-java READMEを参照してください。
サービス定義から生成されるクラスは以下の通りです。
Feature.java、Point.java、Rectangle.javaなど。これらには、リクエストおよびレスポンスメッセージタイプを populate、シリアライズ、取得するためのすべてのプロトコルバッファコードが含まれています。RouteGuideGrpc.java。これには(その他の便利なコードとともに)以下のものが含まれています。RouteGuideサービスで定義されたすべてのメソッドを持つ、RouteGuideサーバーが実装するための基底クラス、RouteGuideGrpc.RouteGuideImplBase。- クライアントが
RouteGuideサーバーと通信するために使用できるスタブクラス。
サーバーを作成する
まず、RouteGuideサーバーを作成する方法を見てみましょう。gRPCクライアントの作成のみに興味がある場合は、このセクションをスキップして、「クライアントを作成する」に直接進んでください(それでも興味深いかもしれません!)。
RouteGuideサービスがそのジョブを実行できるようにするには、2つの部分があります。
- サービス定義から生成されたサービス基底クラスをオーバーライドします。これは、サービスで実際に行われる「作業」を行います。
- クライアントからのリクエストをリッスンし、サービスレスポンスを返すgRPCサーバーを実行すること。
例となるRouteGuideサーバーは、grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.javaで見つけることができます。その仕組みを詳しく見てみましょう。
RouteGuideを実装する
ご覧のように、サーバーには生成されたRouteGuideGrpc.RouteGuideImplBase抽象クラスを拡張するRouteGuideServiceクラスがあります。
private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
...
}
シンプルなRPC
RouteGuideServiceは、すべてのサービスメソッドを実装します。まず、最も単純なメソッドであるGetFeature()を見てみましょう。これは、クライアントからPointを受け取り、データベースから対応するフィーチャー情報をFeatureとして返します。
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
...
private Feature checkFeature(Point location) {
for (Feature feature : features) {
if (feature.getLocation().getLatitude() == location.getLatitude()
&& feature.getLocation().getLongitude() == location.getLongitude()) {
return feature;
}
}
// No feature was found, return an unnamed feature.
return Feature.newBuilder().setName("").setLocation(location).build();
}
getFeature()メソッドは2つのパラメータを取ります。
Point:リクエスト。StreamObserver<Feature>:レスポンスオブザーバー。これは、サーバーがレスポンスで呼び出すための特別なインターフェースです。
クライアントにレスポンスを返し、コールを完了するには。
- サービス定義で指定された
Featureレスポンスオブジェクトを構築してクライアントに返します。この例では、これは別のプライベートcheckFeature()メソッドで行います。 - レスポンスオブザーバーの
onNext()メソッドを使用してFeatureを返します。 - レスポンスオブザーバーの
onCompleted()メソッドを使用して、RPCの処理が完了したことを指定します。
サーバーサイドストリーミングRPC
次に、ストリーミングRPCの1つを見てみましょう。ListFeaturesはサーバーサイドストリーミングRPCなので、クライアントに複数のFeatureを送信する必要があります。
private final Collection<Feature> features;
...
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
シンプルなRPCと同様に、このメソッドはリクエストオブジェクト(クライアントがFeatureを見つけたいRectangle)とStreamObserverレスポンスオブザーバーを受け取ります。
今回は、クライアントに返すのに必要なだけのFeatureオブジェクトを取得し(この例では、サービスの特徴コレクションから、リクエストRectangle内にあるかどうかを基準に選択します)、レスポンスオブザーバーのonNext()メソッドを介して順番に書き込みます。最後に、シンプルなRPCと同様に、レスポンスオブザーバーのonCompleted()メソッドを使用して、レスポンスの書き込みが完了したことをgRPCに伝えます。
クライアントサイドストリーミングRPC
次に、もう少し複雑なものを見てみましょう。クライアントサイドストリーミングメソッドRecordRoute()では、クライアントからPointのストリームを受け取り、旅行に関する情報を含む単一のRouteSummaryを返します。
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
ご覧の通り、以前のメソッドタイプと同様に、メソッドはStreamObserverレスポンスオブザーバーパラメータを受け取りますが、今回はクライアントがPointを書き込むためのStreamObserverを返します。
メソッド本文では、匿名StreamObserverをインスタンス化して返します。この中で、
- クライアントがメッセージストリームに
Pointを書き込むたびにフィーチャーやその他の情報を取得するためにonNext()メソッドをオーバーライドします。 - (クライアントがメッセージの書き込みを完了したときに呼び出される)
onCompleted()メソッドをオーバーライドして、RouteSummaryを populate して構築します。次に、メソッド自体のレスポンスオブザーバーのonNext()をRouteSummaryで呼び出し、その後onCompleted()メソッドを呼び出してサーバー側からコールを完了します。
双方向ストリーミング RPC
最後に、双方向ストリーミングRPCRouteChat()を見てみましょう。
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
クライアントサイドストリーミングの例と同様に、StreamObserverレスポンスオブザーバーを取得して返しますが、今回はクライアントが自身のメッセージストリームにメッセージを書き込んでいる間に、メソッドのレスポンスオブザーバーを介して値を返します。ここでの読み書きの構文は、クライアントストリーミングメソッドの場合とまったく同じです。各サイドは常に相手のメッセージを書き込まれた順序で取得しますが、クライアントとサーバーはどちらの順序でも読み書きできます。ストリームは完全に独立して動作します。
サーバーを起動する
すべてのメソッドを実装したら、クライアントが実際にサービスを使用できるように、gRPCサーバーを起動する必要があります。次のスニペットは、RouteGuideサービスでこれをどのように行うかを示しています。
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
...
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
...
}
ご覧の通り、ServerBuilderを使用してサーバーを構築して開始します。
これを行うために、
- ビルダーの
forPort()メソッドを使用して、クライアントリクエストをリッスンしたいアドレスとポートを指定します。 - サービス実装クラス
RouteGuideServiceのインスタンスを作成し、ビルダーのaddService()メソッドに渡します。 - ビルダーで
build()とstart()を呼び出して、サービス用のRPCサーバーを作成して開始します。
クライアントを作成する
このセクションでは、RouteGuideサービス用のクライアントの作成について説明します。完全なサンプルクライアントコードは、grpc-java/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.javaで見ることができます。
スタブのインスタンス化
サービスメソッドを呼び出すには、まずスタブを作成する必要があります。正確には、2つのスタブを作成します。
- ブロッキング/同期スタブ:これは、RPCコールがサーバーの応答を待機し、応答を返すか例外を発生させることを意味します。
- ノンブロッキング/非同期スタブ:これは、サーバーへのノンブロッキングコールを行い、応答は非同期に返されます。非同期スタブのみを使用して、特定の種類のストリーミングコールを行うことができます。
まず、接続したいサーバーのアドレスとポートを指定して、スタブ用のgRPCチャネルを作成する必要があります。
public RouteGuideClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
}
/** Construct client for accessing RouteGuide server using the existing channel. */
public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
チャネルの作成にはManagedChannelBuilderを使用します。
これで、チャネルを使用して、.protoから生成されたRouteGuideGrpcクラスで提供されているnewStubおよびnewBlockingStubメソッドを使用してスタブを作成できます。
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
サービスメソッドを呼び出す
それでは、サービスメソッドの呼び出し方法を見てみましょう。
シンプルなRPC
ブロッキングスタブでシンプルなRPCGetFeatureを呼び出すのは、ローカルメソッドを呼び出すのと同じくらい簡単です。
Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature;
try {
feature = blockingStub.getFeature(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
リクエストプロトコルバッファオブジェクト(この場合はPoint)を作成して populate し、ブロッキングスタブのgetFeature()メソッドに渡して、Featureを取得します。
エラーが発生した場合、それはStatusとしてエンコードされ、StatusRuntimeExceptionから取得できます。
サーバーサイドストリーミングRPC
次に、地理的なFeatureのストリームを返すサーバーサイドストリーミングコールListFeaturesを見てみましょう。
Rectangle request =
Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
ご覧の通り、これは先ほど見たシンプルなRPCと非常によく似ていますが、単一のFeatureを返す代わりに、メソッドはクライアントが返されたすべてのFeatureを読み取るために使用できるIteratorを返します。
クライアントサイドストリーミングRPC
次に、もう少し複雑なものを見てみましょう。クライアントサイドストリーミングメソッドRecordRouteでは、サーバーにPointのストリームを送信し、単一のRouteSummaryを取得します。このメソッドでは、非同期スタブを使用する必要があります。もしサーバーの作成を既に読んでいる場合、この一部は非常に馴染みのあるものになるでしょう。非同期ストリーミングRPCは、両サイドで同様の方法で実装されています。
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
ご覧の通り、このメソッドを呼び出すには、サーバーがRouteSummaryレスポンスで呼び出すための特別なインターフェースを実装するStreamObserverを作成する必要があります。StreamObserverでは、
- サーバーが
RouteSummaryをメッセージストリームに書き込むたびに、返された情報を表示するためにonNext()メソッドをオーバーライドします。 - (サーバーがコールを完了したときに呼び出される)
onCompleted()メソッドをオーバーライドして、サーバーが書き込みを完了したかどうかを確認するために使用できるCountDownLatchを減らします。
次に、StreamObserverを非同期スタブのrecordRoute()メソッドに渡し、サーバーに送信するPointを書き込むための独自のStreamObserverリクエストオブザーバーを取得します。ポイントの書き込みが完了したら、リクエストオブザーバーのonCompleted()メソッドを使用して、クライアント側での書き込みが完了したことをgRPCに伝えます。完了したら、CountDownLatchをチェックして、サーバーが側で完了したことを確認します。
双方向ストリーミング RPC
最後に、双方向ストリーミングRPCRouteChat()を見てみましょう。
public void routeChat() throws Exception {
info("*** RoutChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
クライアントサイドストリーミングの例と同様に、StreamObserverレスポンスオブザーバーを取得して返しますが、今回はサーバーが自身のメッセージストリームにメッセージを書き込んでいる間に、メソッドのレスポンスオブザーバーを介して値を送信します。ここでの読み書きの構文は、クライアントストリーミングメソッドの場合とまったく同じです。各サイドは常に相手のメッセージを書き込まれた順序で取得しますが、クライアントとサーバーはどちらの順序でも読み書きできます。ストリームは完全に独立して動作します。
試してみましょう!
クライアントとサーバーのビルドと実行については、サンプルディレクトリのREADMEの指示に従ってください。