基本チュートリアル
DartにおけるgRPCの基本チュートリアルです。
基本チュートリアル
このチュートリアルでは、Dartプログラマー向けにgRPCの基本的な使い方を紹介します。
この例に従うことで、以下の方法を学ぶことができます。
.proto
ファイルでサービスを定義する。- Protocol Bufferコンパイラを使用してサーバーとクライアントのコードを生成する。
- Dart gRPC APIを使用して、サービスのシンプルなクライアントとサーバーを作成する。
gRPC入門を読み、Protocol Buffersに精通していることを前提としています。このチュートリアルの例では、proto3バージョンのProtocol Buffers言語を使用しています。詳しくは、proto3言語ガイドをご覧ください。
なぜgRPCを使うのか?
この例は、クライアントがルート上の地物の情報を取得したり、ルートの概要を作成したり、サーバーや他のクライアントと交通情報などのルート情報を交換したりできるシンプルなルートマッピングアプリケーションです。
gRPCを使用すると、サービスを.proto
ファイルに一度定義し、gRPCがサポートする任意の言語でクライアントとサーバーを生成できます。生成されたクライアントとサーバーは、大規模データセンター内のサーバーから自分のタブレットまで、さまざまな環境で実行できます。異なる言語や環境間の通信の複雑さはすべてgRPCによって処理されます。また、効率的なシリアライゼーション、シンプルなIDL、簡単なインターフェース更新など、Protocol Buffersを使用するすべての利点を得ることができます。
サンプルコードとセットアップ
このチュートリアルのサンプルコードは、grpc/grpc-dart/example/route_guideにあります。サンプルをダウンロードするには、次のコマンドを実行してgrpc-dart
リポジトリをクローンします。
$ git clone --depth 1 https://github.com/grpc/grpc-dart
次に、カレントディレクトリをgrpc-dart/example/route_guide
に変更します。
$ cd grpc-dart/example/route_guide
クライアントとサーバーのインターフェースコードを生成するために必要なツールは既にインストールされているはずです。インストールされていない場合は、クイックスタートのセットアップ手順をご覧ください。
サービスの定義
最初の手順(gRPC入門で既にご存知のとおり)は、Protocol Buffersを使用して、gRPC *サービス*とメソッドの*リクエスト*および*レスポンス*タイプを定義することです。完全な.proto
ファイルは、example/route_guide/protos/route_guide.proto
にあります。
サービスを定義するには、.proto
ファイルで名前付きのservice
を指定します。
service RouteGuide {
...
}
次に、サービス定義内でrpc
メソッドを定義し、それらのリクエストとレスポンスタイプを指定します。 gRPCでは、4種類のサービスメソッドを定義できます。これらはすべてRouteGuide
サービスで使用されます。
クライアントがスタブを使用してサーバーにリクエストを送信し、通常の関数呼び出しのようにレスポンスが返ってくるのを待つ*単純RPC*。
// Obtains the feature at a given position. rpc GetFeature(Point) returns (Feature) {}
クライアントがサーバーにリクエストを送信し、一連のメッセージを読み取るためのストリームを取得する*サーバーサイドストリーミングRPC*。クライアントは、メッセージがなくなるまで返されたストリームから読み取ります。この例でわかるように、*レスポンス*タイプの前に
stream
キーワードを配置することで、サーバーサイドストリーミングメソッドを指定します。// Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
クライアントが一連のメッセージを書き込み、提供されたストリームを使用してサーバーに送信する*クライアントサイドストリーミングRPC*。クライアントはメッセージの書き込みが完了すると、サーバーがすべてを読み取ってレスポンスを返すまで待機します。 *リクエスト*タイプの前に
stream
キーワードを配置することで、クライアントサイドストリーミングメソッドを指定します。// Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
両側が読み書きストリームを使用して一連のメッセージを送信する*双方向ストリーミングRPC*。 2つのストリームは独立して動作するため、クライアントとサーバーは好きな順序で読み書きできます。たとえば、サーバーはすべてのクライアントメッセージを受信するまで待機してからレスポンスを書き込むことも、メッセージを読み取ってからメッセージを書き込むことも、あるいは読み取りと書き込みの他の組み合わせを行うこともできます。各ストリーム内のメッセージの順序は保持されます。リクエストとレスポンスの両方の前に
stream
キーワードを配置することで、このタイプのメソッドを指定します。// Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
.proto
ファイルには、サービスメソッドで使用されるすべてのリクエストとレスポンスタイプのProtocol Bufferメッセージタイプの定義も含まれています。たとえば、Point
メッセージタイプは次のとおりです。
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
クライアントとサーバーコードの生成
次に、.proto
サービス定義からgRPCクライアントとサーバーのインターフェースを生成する必要があります。これは、特別なDartプラグインを備えたProtocol Bufferコンパイラprotoc
を使用して行います。これは、クイックスタートで行ったことと似ています。
route_guide
のサンプルディレクトリから、以下を実行します。
protoc -I protos/ protos/route_guide.proto --dart_out=grpc:lib/src/generated
このコマンドを実行すると、route_guide
サンプルディレクトリ下のlib/src/generated
ディレクトリに次のファイルが生成されます。
route_guide.pb.dart
route_guide.pbenum.dart
route_guide.pbgrpc.dart
route_guide.pbjson.dart
これらには以下が含まれています。
- リクエストとレスポンスのメッセージタイプを入力、シリアル化、および取得するためのすべてのProtocol Bufferコード。
- クライアントが`RouteGuide`サービスで定義されたメソッドで呼び出すためのインターフェースタイプ(または*スタブ*)。
- サーバーが実装するためのインターフェースタイプ。これも`RouteGuide`サービスで定義されたメソッドを持ちます。
サーバーの作成
最初に、`RouteGuide`サーバーを作成する方法を見てみましょう。 gRPCクライアントの作成にのみ関心がある場合は、このセクションをスキップして、クライアントの作成に進んでください(ただし、いずれにしても興味深いと思われるかもしれません!)。
`RouteGuide`サービスを機能させるには、2つの部分があります。
- サービス定義から生成されたサービスインターフェースを実装する:サービスの実際の「作業」を行う。
- gRPCサーバーを実行して、クライアントからのリクエストをリッスンし、正しいサービス実装にディスパッチする。
サンプルの`RouteGuide`サーバーは、grpc-dart/example/route_guide/lib/src/server.dartにあります。その仕組みを詳しく見てみましょう。
RouteGuideの実装
ご覧のとおり、サーバーには、生成された抽象`RouteGuideServiceBase`クラスを拡張する`RouteGuideService`クラスがあります。
class RouteGuideService extends RouteGuideServiceBase {
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
...
}
Stream<Feature> listFeatures(
grpc.ServiceCall call, Rectangle request) async* {
...
}
Future<RouteSummary> recordRoute(
grpc.ServiceCall call, Stream<Point> request) async {
...
}
Stream<RouteNote> routeChat(
grpc.ServiceCall call, Stream<RouteNote> request) async* {
...
}
...
}
単純RPC
`RouteGuideService`は、すべてのサービスメソッドを実装します。最初に最も単純なタイプである`GetFeature`を見てみましょう。これは、クライアントから`Point`を取得し、データベースから対応する地物情報を`Feature`で返します。
/// GetFeature handler. Returns a feature for the given location.
/// The [context] object provides access to client metadata, cancellation, etc.
@override
Future<Feature> getFeature(grpc.ServiceCall call, Point request) async {
return featuresDb.firstWhere((f) => f.location == request,
orElse: () => Feature()..location = request);
}
メソッドには、RPCのコンテキストオブジェクトとクライアントの`Point` Protocol Bufferリクエストが渡されます。レスポンス情報を含む`Feature` Protocol Bufferオブジェクトを返します。メソッドでは、`Feature`に適切な情報を入力し、gRPCフレームワークに`return`します。 gRPCフレームワークは、それをクライアントに送り返します。
サーバーサイドストリーミングRPC
次に、ストリーミングRPCの1つを見てみましょう。 `ListFeatures`はサーバーサイドストリーミングRPCであるため、複数の`Feature`をクライアントに送り返す必要があります。
/// ListFeatures handler. Returns a stream of features within the given
/// rectangle.
@override
Stream<Feature> listFeatures(
grpc.ServiceCall call, Rectangle request) async* {
final normalizedRectangle = _normalize(request);
// For each feature, check if it is in the given bounding box
for (var feature in featuresDb) {
if (feature.name.isEmpty) continue;
final location = feature.location;
if (_contains(normalizedRectangle, location)) {
yield feature;
}
}
}
ご覧のとおり、メソッドで単純なリクエストとレスポンスのオブジェクトを取得して返す代わりに、今回はリクエストオブジェクト(クライアントが`Feature`を見つけたい`Rectangle`)を取得し、`Feature`オブジェクトの`Stream`を返します。
メソッドでは、返す必要のある数の`Feature`オブジェクトを入力し、`yield`を使用して返されたストリームに追加します。メソッドが返るとストリームは自動的に閉じられ、gRPCにレスポンスの書き込みが完了したことが通知されます。
この呼び出しでエラーが発生した場合、エラーは例外としてストリームに追加され、gRPCレイヤーはそれを適切なRPCステータスに変換してネットワーク上で送信します。
クライアントサイドストリーミングRPC
次に、もう少し複雑なものを見てみましょう。クライアントサイドストリーミングメソッド`RecordRoute`です。ここでは、クライアントから`Point`のストリームを取得し、旅行に関する情報を含む単一の`RouteSummary`を返します。ご覧のとおり、今回はリクエストパラメータはストリームであり、サーバーはこれを使用してクライアントからのリクエストメッセージを読み取ることができます。サーバーは、単純なRPCの場合と同様に、単一のレスポンスを返します。
/// RecordRoute handler. Gets a stream of points, and responds with statistics
/// about the "trip": number of points, number of known features visited,
/// total distance traveled, and total time spent.
@override
Future<RouteSummary> recordRoute(
grpc.ServiceCall call, Stream<Point> request) async {
int pointCount = 0;
int featureCount = 0;
double distance = 0.0;
Point previous;
final timer = Stopwatch();
await for (var location in request) {
if (!timer.isRunning) timer.start();
pointCount++;
final feature = featuresDb.firstWhereOrNull((f) => f.location == location);
if (feature != null) {
featureCount++;
}
// For each point after the first, add the incremental distance from the
// previous point to the total distance value.
if (previous != null) distance += _distance(previous, location);
previous = location;
}
timer.stop();
return RouteSummary()
..pointCount = pointCount
..featureCount = featureCount
..distance = distance.round()
..elapsedTime = timer.elapsed.inSeconds;
}
メソッド本体では、リクエストストリームで`await for`を使用して、クライアントのリクエスト(この場合は`Point`オブジェクト)を繰り返し読み込み、メッセージがなくなるまで続けます。リクエストストリームが完了すると、サーバーは`RouteSummary`を返すことができます。
双方向ストリーミングRPC
最後に、双方向ストリーミングRPC `RouteChat()`を見てみましょう。
/// RouteChat handler. Receives a stream of message/location pairs, and
/// responds with a stream of all previous messages at each of those
/// locations.
@override
Stream<RouteNote> routeChat(
grpc.ServiceCall call, Stream<RouteNote> request) async* {
await for (var note in request) {
final notes = routeNotes.putIfAbsent(note.location, () => <RouteNote>[]);
for (var note in notes) yield note;
notes.add(note);
}
}
今回は、クライアントサイドストリーミングの例のように、メッセージを読み取るために使用できる`RouteNote`のストリームを取得します。ただし、今回は、クライアントが*自分の*メッセージストリームにメッセージを書き込んでいる間に、メソッドの返されたストリームを介して値を返します。
ここでの読み書きの構文は、クライアントストリーミングおよびサーバーストリーミングメソッドと同じです。 各側は常に相手のメッセージを書き込まれた順序で受信しますが、クライアントとサーバーの両方が任意の順序で読み書きできます。ストリームは完全に独立して動作します。
サーバーの起動
すべてのメソッドを実装したら、クライアントが実際にサービスを使用できるようにgRPCサーバーを起動する必要があります。 次のスニペットは、RouteGuide
サービスに対してこれを実行する方法を示しています。
Future<void> main(List<String> args) async {
final server = grpc.Server.create([RouteGuideService()]);
await server.serve(port: 8080);
print('Server listening...');
}
サーバーをビルドして起動するには、次の手順を実行します。
grpc.Server.create()
を使用してgRPCサーバーのインスタンスを作成し、サービス実装のリストを提供します。- サーバーで
serve()
を呼び出して、リクエストのリスニングを開始します。オプションで、リスニングするアドレスとポートを渡します。 サーバーは、shutdown()
が呼び出されるまで、リクエストを非同期的に処理し続けます。
クライアントの作成
このセクションでは、RouteGuide
サービスのDartクライアントを作成する方法について説明します。 クライアントコード全体は、grpc-dart/example/route_guide/lib/src/client.dartから入手できます。
スタブの作成
サービスメソッドを呼び出すには、まずサーバーと通信するためのgRPC *チャネル*を作成する必要があります。 これは、サーバーのアドレスとポート番号を次のようにClientChannel()
に渡すことによって作成します。
final channel = ClientChannel('127.0.0.1',
port: 8080,
options: const ChannelOptions(
credentials: ChannelCredentials.insecure()));
必要に応じて、ChannelOptions
を使用して、チャネルのTLSオプション(信頼できる証明書など)を設定できます。
gRPC *チャネル*がセットアップされたら、RPCを実行するためのクライアント*スタブ*が必要です。 これは、例の.proto
ファイルから生成されたパッケージによって提供されるRouteGuideClient
をインスタンス化することによって取得します。
stub = RouteGuideClient(channel,
options: CallOptions(timeout: Duration(seconds: 30)));
サービスで認証資格情報が必要な場合は、CallOptions
を使用して認証資格情報(GCE資格情報やJWT資格情報など)を設定できます。 RouteGuide
サービスは資格情報を必要としません。
サービスメソッドの呼び出し
それでは、サービスメソッドの呼び出し方法を見てみましょう。 gRPC-Dartでは、RPCは常に非同期であることに注意してください。つまり、RPCは、サーバーからの応答またはエラーを取得するためにリスニングする必要があるFuture
またはStream
を返します。
単純RPC
単純なRPC GetFeature
の呼び出しは、ローカルメソッドの呼び出しとほぼ同じくらい簡単です。
final point = Point()
..latitude = 409146138
..longitude = -746188906;
final feature = await stub.getFeature(point));
ご覧のとおり、先ほど取得したスタブでメソッドを呼び出します。 メソッドパラメータには、リクエストプロトコルバッファオブジェクト(この場合はPoint
)を渡します。 また、必要に応じてRPCの動作を変更できるオプションのCallOptions
オブジェクト(タイムアウトなど)を渡すこともできます。 呼び出しがエラーを返さない場合、返されたFuture
はサーバーからの応答情報で完了します。 エラーが発生した場合、Future
はエラーで完了します。
サーバーサイドストリーミングRPC
ここでは、地理的なFeature
のストリームを返すサーバー側ストリーミングメソッドListFeatures
を呼び出します。 すでにサーバーの作成を読んでいる場合、この一部は非常によく似ているように見えるかもしれません。ストリーミングRPCは両側で同様の方法で実装されています。
final rect = Rectangle()...; // initialize a Rectangle
try {
await for (var feature in stub.listFeatures(rect)) {
print(feature);
}
catch (e) {
print('ERROR: $e');
}
単純なRPCと同様に、メソッドにリクエストを渡します。 ただし、Future
が返される代わりに、Stream
が返されます。 クライアントはストリームを使用してサーバーの応答を読み取ることができます。
返されたストリームでawait for
を使用して、メッセージがなくなるまで、サーバーの応答を応答プロトコルバッファオブジェクト(この場合はFeature
)に繰り返し読み込みます。
クライアントサイドストリーミングRPC
クライアント側ストリーミングメソッドRecordRoute
は、メソッドにStream
を渡し、Future
を返すことを除いて、サーバー側メソッドに似ています。
final random = Random();
// Generate a number of random points
Stream<Point> generateRoute(int count) async* {
for (int i = 0; i < count; i++) {
final point = featuresDb[random.nextInt(featuresDb.length)].location;
yield point;
}
}
final pointCount = random.nextInt(100) + 2; // Traverse at least two points
final summary = await stub.recordRoute(generateRoute(pointCount));
print('Route summary: $summary');
generateRoute()
メソッドはasync*
であるため、gRPCがリクエストストリームをリッスンしてポイントメッセージをサーバーに送信すると、ポイントが生成されます。 ストリームが完了すると(generateRoute()
が戻ると)、gRPCは書き込みが完了し、応答を受信することを期待していることを認識します。 返されたFuture
は、サーバーから受信したRouteSummary
メッセージ、またはエラーで完了します。
双方向ストリーミングRPC
最後に、双方向ストリーミングRPC RouteChat()
を見てみましょう。 RecordRoute
の場合と同様に、リクエストメッセージを書き込むストリームをメソッドに渡し、ListFeatures
と同様に、レスポンスメッセージを読み取るために使用できるストリームを取得します。 ただし、今回は、サーバーが*独自の*メッセージストリームにメッセージを書き込んでいる間に、メソッドのストリームを介して値を送信します。
Stream<RouteNote> outgoingNotes = ...;
final responses = stub.routeChat(outgoingNotes);
await for (var note in responses) {
print('Got message ${note.message} at ${note.location.latitude}, ${note
.location.longitude}');
}
ここでの読み書きの構文は、クライアント側およびサーバー側のストリーミングメソッドと非常によく似ています。 各側は常に相手のメッセージを書き込まれた順序で受信しますが、クライアントとサーバーの両方が任意の順序で読み書きできます。ストリームは完全に独立して動作します。
試してみよう!
サンプルディレクトリから作業します
$ cd example/route_guide
パッケージを取得します
$ dart pub get
サーバーを実行します
$ dart bin/server.dart
別のターミナルから、クライアントを実行します
$ dart bin/client.dart
問題の報告
Dart gRPCに問題が見つかった場合は、イシュートラッカーに問題を報告してください。