基本チュートリアル

Android JavaにおけるgRPCの基本チュートリアルです。

基本チュートリアル

Android JavaにおけるgRPCの基本チュートリアルです。

このチュートリアルでは、Android Javaプログラマー向けにgRPCの基本的な使用方法を紹介します。

この例を実際に試すことで、以下の方法を学ぶことができます。

  • .protoファイルでサービスを定義する。
  • Protocol Bufferコンパイラを使用してクライアントコードを生成する。
  • Java gRPC APIを使用して、サービス用のシンプルなモバイルクライアントを作成する。

gRPCの概要を読み、Protocol Buffersに精通していることを前提としています。また、このガイドではサーバー側については触れていません。詳しくはJavaページをご覧ください。

なぜgRPCを使うのか?

この例では、クライアントがルート上の地物の情報を取得し、ルートの概要を作成し、サーバーや他のクライアントと交通情報などのルート情報を交換できるシンプルなルートマッピングアプリケーションを使用します。

gRPCを使用すると、.protoファイルでサービスを一度定義し、gRPCがサポートする任意の言語でクライアントとサーバーを生成できます。生成されたクライアントとサーバーは大規模データセンター内のサーバーから自分のタブレットまで、さまざまな環境で実行できます。異なる言語や環境間の通信の複雑さはすべてgRPCによって処理されます。また、効率的なシリアライゼーション、シンプルなIDL、簡単なインターフェース更新など、Protocol Buffersを使用するすべての利点を得ることができます。

サンプルコードとセットアップ

このチュートリアルのサンプルコードは、grpc-javaのexamples/androidにあります。サンプルをダウンロードするには、次のコマンドを実行してgrpc-javaリポジトリをクローンします。

$ git clone -b v1.63.0 https://github.com/grpc/grpc-java.git

次に、現在のディレクトリをgrpc-java/examples/androidに変更します。

$ cd grpc-java/examples/android

また、クライアントインターフェースコードを生成するために必要なツールがインストールされている必要があります。まだインストールされていない場合は、grpc-java READMEのセットアップ手順に従ってください。

サービスの定義

最初の手順(gRPCの概要で知っているように)は、Protocol Buffersを使用してgRPC *サービス*とメソッドの*リクエスト*と*レスポンス*の型を定義することです。完全な.protoファイルは、routeguide/app/src/main/proto/route_guide.protoで確認できます。

この例ではJavaコードを生成しているため、.protojava_packageファイルオプションを指定しています。

option java_package = "io.grpc.examples";

これは、生成されたJavaクラスに使用するパッケージを指定します。.protoファイルに明示的なjava_packageオプションが指定されていない場合、デフォルトではprotoパッケージ(「package」キーワードを使用して指定)が使用されます。ただし、protoパッケージはリバースドメイン名で始まることが想定されていないため、一般的にJavaパッケージとしては適していません。この.protoから別の言語でコードを生成する場合、`java_package`オプションは効力を持たず、無視されます.

サービスを定義するには、.protoファイルで名前付き`service`を指定します。

service RouteGuide {
   ...
}

次に、サービス定義内で`rpc`メソッドを定義し、それらのリクエストとレスポンスタイプを指定します。gRPCでは、4種類のサービスメソッドを定義できます。これらのメソッドはすべて`RouteGuide`サービスで使用されます。

  • クライアントがスタブを使用してサーバーにリクエストを送信し、通常の関数呼び出しのようにレスポンスが返ってくるのを待つ*シンプルRPC*。

    // Obtains the feature at a given position.
    rpc GetFeature(Point) returns (Feature) {}
    
  • クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るためのストリームを取得する*サーバーサイドストリーミングRPC*。クライアントは、メッセージがなくなるまで返されたストリームから読み取ります。この例でわかるように、*レスポンス*タイプの前に`stream`キーワードを配置することで、サーバーサイドストリーミングメソッドを指定します。

    // Obtains the Features available within the given Rectangle.  Results are
    // streamed rather than returned at once (e.g. in a response message with a
    // repeated field), as the rectangle may cover a large area and contain a
    // huge number of features.
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • クライアントが提供されたストリームを使用して一連のメッセージを書き込み、サーバーに送信する*クライアントサイドストリーミングRPC*。クライアントはメッセージの書き込みが完了すると、サーバーがすべてのメッセージを読み取ってレスポンスを返すまで待機します。*リクエスト*タイプの前に`stream`キーワードを配置することで、クライアントサイドストリーミングメソッドを指定します。

    // Accepts a stream of Points on a route being traversed, returning a
    // RouteSummary when traversal is completed.
    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 両側が読み書きストリームを使用して一連のメッセージを送信する*双方向ストリーミングRPC*。2つのストリームは独立して動作するため、クライアントとサーバーは好きな順序で読み書きできます。たとえば、サーバーはすべてのクライアントメッセージを受信するまで待機してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことも、あるいは読み取りと書き込みの他の組み合わせを行うこともできます。各ストリームのメッセージの順序は保持されます。リクエストとレスポンスの両方の前に`stream`キーワードを配置することで、このタイプのメソッドを指定します。

    // Accepts a stream of RouteNotes sent while a route is being traversed,
    // while receiving other RouteNotes (e.g. from other users).
    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

.protoファイルには、サービスメソッドで使用されるすべてのリクエストとレスポンスタイプのProtocol Bufferメッセージタイプの定義も含まれています。たとえば、`Point`メッセージタイプは次のとおりです。

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

クライアントコードの生成

次に、.protoサービス定義からgRPCクライアントインターフェースを生成する必要があります。これは、特別なgRPC Javaプラグインを使用してProtocol Bufferコンパイラ`protoc`で行います。 gRPCサービスを生成するには、proto3コンパイラ(proto2とproto3の両方の構文をサポート)を使用する必要があります。

この例のビルドシステムもJava-gRPCビルドの一部です。独自の.protoファイルからコードを生成する方法については、grpc-java READMEbuild.gradleを参照してください。Androidの場合、モバイルユースケースに最適化されたprotobuf liteを使用することに注意してください。

サービス定義から次のクラスが生成されます。

  • リクエストとレスポンスのメッセージタイプを入力、シリアル化、および取得するためのすべてのProtocol Bufferコードを含む`Feature.java`、`Point.java`、`Rectangle.java`など。
  • (他のいくつかの便利なコードとともに)以下を含む`RouteGuideGrpc.java`。
    • `RouteGuide`サービスで定義されているすべてのメソッドを含む、`RouteGuide`サーバーが実装するための基底クラス`RouteGuideGrpc.RouteGuideImplBase`。
    • クライアントが`RouteGuide`サーバーと通信するために使用できる*スタブ*クラス。

クライアントの作成

このセクションでは、`RouteGuide`サービスのJavaクライアントを作成する方法について説明します。完全なサンプルクライアントコードは、routeguide/app/src/main/java/io/grpc/routeguideexample/RouteGuideActivity.javaで確認できます。

スタブの作成

サービスメソッドを呼び出すには、最初に*スタブ*、またはむしろ2つのスタブを作成する必要があります。

  • *ブロッキング/同期*スタブ:これは、RPC呼び出しがサーバーのレスポンスを待機し、レスポンスを返すか例外を発生させることを意味します。
  • サーバーへの非ブロッキング呼び出しを行う*非ブロッキング/非同期*スタブ。レスポンスは非同期に返されます。特定の種類のストリーミング呼び出しは、非同期スタブを使用してのみ行うことができます。

最初に、接続先のサーバーのアドレスとポートを指定して、スタブ用のgRPC *チャネル*を作成する必要があります。チャネルを作成するには、`ManagedChannelBuilder`を使用します。

mChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();

これで、.protoから生成した`RouteGuideGrpc`クラスで提供されている`newStub`メソッドと`newBlockingStub`メソッドを使用して、チャネルを使用してスタブを作成できます。

blockingStub = RouteGuideGrpc.newBlockingStub(mChannel);
asyncStub = RouteGuideGrpc.newStub(mChannel);

サービスメソッドの呼び出し

次に、サービスメソッドの呼び出し方法を見てみましょう。

シンプルRPC

ブロッキングスタブでシンプルRPC `GetFeature`を呼び出すのは、ローカルメソッドを呼び出すのと同じくらい簡単です。

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
Feature feature = blockingStub.getFeature(request);

リクエストProtocol Bufferオブジェクト(この場合は`Point`)を作成して入力し、ブロッキングスタブの`getFeature()`メソッドに渡して、`Feature`を取得します。

サーバーサイドストリーミングRPC

次に、地理的な`Feature`のストリームを返すサーバーサイドストリーミング呼び出し`ListFeatures`を見てみましょう。

Rectangle request =
    Rectangle.newBuilder()
        .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features = blockingStub.listFeatures(request);

ご覧のとおり、単一の`Feature`を返すのではなく、クライアントがすべての返された`Feature`を読み取るために使用できる`Iterator`を返す点を除いて、先ほど見たシンプルRPCと非常によく似ています。

クライアントサイドストリーミングRPC

もう少し複雑なもの:クライアントサイドストリーミングメソッド`RecordRoute`。ここでは、`Point`のストリームをサーバーに送信し、単一の`RouteSummary`を取得します。このメソッドでは、非同期スタブを使用する必要があります。サーバーの作成をすでに読んでいる場合、この中には非常になじみのあるものがあるかもしれません。非同期ストリーミングRPCは、両側で同様の方法で実装されます。

private String recordRoute(List<Point> points, int numPoints, RouteGuideStub asyncStub)
        throws InterruptedException, RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RecordRoute");

    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
            appendLogs(logs, "Finished trip with {0} points. Passed {1} features. "
                    + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
                    summary.getFeatureCount(), summary.getDistance(),
                    summary.getElapsedTime());
        }

        @Override
        public void onError(Throwable t) {
            failed = t;
            finishLatch.countDown();
        }

        @Override
        public void onCompleted() {
            appendLogs(logs, "Finished RecordRoute");
            finishLatch.countDown();
        }
    };

    StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
    try {
        // Send numPoints points randomly selected from the points list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
            int index = rand.nextInt(points.size());
            Point point = points.get(index);
            appendLogs(logs, "Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
                    RouteGuideUtil.getLongitude(point));
            requestObserver.onNext(point);
            // Sleep for a bit before sending the next one.
            Thread.sleep(rand.nextInt(1000) + 500);
            if (finishLatch.getCount() == 0) {
                // RPC completed or errored before we finished sending.
                // Sending further requests won't error, but they will just be thrown away.
                break;
            }
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
               "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }
    return logs.toString();
}

ご覧のとおり、このメソッドを呼び出すには、`StreamObserver`を作成する必要があります。これは、サーバーが`RouteSummary`レスポンスで呼び出すための特別なインターフェースを実装しています。 `StreamObserver`では、以下を行います。

  • サーバーがメッセージストリームに RouteSummary を書き込む際に返される情報を表示するには、onNext() メソッドをオーバーライドします。
  • (*サーバー*側で呼び出しが完了したときに呼び出される)onCompleted() メソッドをオーバーライドして、サーバーの書き込み完了を確認できる SettableFuture を設定します。

次に、StreamObserver を非同期スタブの recordRoute() メソッドに渡し、サーバーに送信する Point を書き込むための独自の StreamObserver リクエストオブザーバーを取得します。ポイントの書き込みが完了したら、リクエストオブザーバーの onCompleted() メソッドを使用して、クライアント側の書き込みが完了したことを gRPC に通知します。完了したら、SettableFuture を確認して、サーバー側で処理が完了したことを確認します。

双方向ストリーミングRPC

最後に、双方向ストリーミングRPC RouteChat() を見てみましょう。

private String routeChat(RouteGuideStub asyncStub) throws InterruptedException,
        RuntimeException {
    final StringBuffer logs = new StringBuffer();
    appendLogs(logs, "*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
            asyncStub.routeChat(new StreamObserver<RouteNote>() {
                @Override
                public void onNext(RouteNote note) {
                    appendLogs(logs, "Got message \"{0}\" at {1}, {2}", note.getMessage(),
                            note.getLocation().getLatitude(),
                            note.getLocation().getLongitude());
                }

                @Override
                public void onError(Throwable t) {
                    failed = t;
                    finishLatch.countDown();
                }

                @Override
                public void onCompleted() {
                    appendLogs(logs,"Finished RouteChat");
                    finishLatch.countDown();
                }
            });

    try {
        RouteNote[] requests =
                {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                        newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

        for (RouteNote request : requests) {
            appendLogs(logs, "Sending message \"{0}\" at {1}, {2}", request.getMessage(),
                    request.getLocation().getLatitude(),
                    request.getLocation().getLongitude());
            requestObserver.onNext(request);
        }
    } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // Receiving happens asynchronously
    if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        throw new RuntimeException(
                "Could not finish rpc within 1 minute, the server is likely down");
    }

    if (failed != null) {
        throw new RuntimeException(failed);
    }

    return logs.toString();
}

クライアント側ストリーミングの例と同様に、StreamObserver レスポンスオブザーバーを取得し、返します。ただし、今回は、サーバーが*サーバー側*のメッセージストリームにメッセージを書き込んでいる間に、メソッドのレスポンスオブザーバーを介して値を送信します。ここでの読み取りと書き込みの構文は、クライアントストリーミングメソッドの場合とまったく同じです。各側は常に相手のメッセージを受信した順序で取得しますが、クライアントとサーバーは任意の順序で読み書きできます。ストリームは完全に独立して動作します。

試してみよう!

クライアントとサーバーをビルドして実行するには、サンプルディレクトリの README の手順に従ってください。