非同期APIチュートリアル

非同期APIチュートリアル

このチュートリアルでは、gRPCの非同期/ノンブロッキングAPIを使用して、C++でシンプルなサーバーとクライアントを作成する方法を説明します。基本チュートリアルで説明されているように、シンプルな同期gRPCコードの記述に既に精通していることを前提としています。このチュートリアルで使用されている例は、Greeterの例(クイックスタートで使用)に基づいています。grpc/examples/cpp/helloworldに、インストール手順と共に記載されています。

概要

gRPCは、非同期操作にCompletionQueue APIを使用します。基本的なワークフローは以下のとおりです。

  • CompletionQueueをRPC呼び出しにバインドする
  • 読み取りまたは書き込みなどの操作を行い、一意のvoid*タグを提示する
  • CompletionQueue::Nextを呼び出して、操作の完了を待つ。タグが表示された場合、対応する操作が完了したことを示します。

非同期クライアント

リモートメソッドを呼び出すために非同期クライアントを使用するには、同期クライアントと同様に、まずチャネルとスタブを作成します。スタブを作成したら、非同期呼び出しを行うために次の手順を実行します。

  • RPCを開始し、そのためのハンドルを作成する。RPCをCompletionQueueにバインドする。

    CompletionQueue cq;
    std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
        stub_->AsyncSayHello(&context, request, &cq));
    
  • 一意のタグを使用して、応答と最終的なステータスを要求する

    Status status;
    rpc->Finish(&reply, &status, (void*)1);
    
  • 完了キューが次のタグを返すのを待つ。対応するFinish()呼び出しに渡されたタグが返されると、応答とステータスが準備されます。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // check reply and status
    }
    

完全なクライアントの例は、greeter_async_client.ccで確認できます。

非同期サーバー

サーバーの実装では、タグ付きでRPC呼び出しを要求し、完了キューがタグを返すのを待ちます。RPCを非同期的に処理するための基本的な流れは以下のとおりです。

  • 非同期サービスをエクスポートするサーバーを構築する

    helloworld::Greeter::AsyncService service;
    ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
    builder.RegisterService(&service);
    auto cq = builder.AddCompletionQueue();
    auto server = builder.BuildAndStart();
    
  • 一意のタグを提供して、1つのRPCを要求する

    ServerContext context;
    HelloRequest request;
    ServerAsyncResponseWriter<HelloReply> responder;
    service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
    
  • 完了キューがタグを返すのを待つ。タグを取得すると、コンテキスト、リクエスト、レスポンダが準備されます。

    HelloReply reply;
    Status status;
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // set reply and status
      responder.Finish(reply, status, (void*)2);
    }
    
  • 完了キューがタグを返すのを待つ。タグが戻ると、RPCが終了します。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)2) {
      // clean up
    }
    

しかし、この基本的な流れでは、サーバーが複数のリクエストを同時に処理することを考慮していません。これに対処するために、完全な非同期サーバーの例では、CallDataオブジェクトを使用して各RPCの状態を維持し、このオブジェクトのアドレスを呼び出しの一意のタグとして使用します。

class CallData {
public:
  // Take in the "service" instance (in this case representing an asynchronous
  // server) and the completion queue "cq" used for asynchronous communication
  // with the gRPC runtime.
  CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
      : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
    // Invoke the serving logic right away.
    Proceed();
  }

  void Proceed() {
    if (status_ == CREATE) {
      // As part of the initial CREATE state, we *request* that the system
      // start processing SayHello requests. In this request, "this" acts are
      // the tag uniquely identifying the request (so that different CallData
      // instances can serve different requests concurrently), in this case
      // the memory address of this CallData instance.
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                this);
      // Make this instance progress to the PROCESS state.
      status_ = PROCESS;
    } else if (status_ == PROCESS) {
      // Spawn a new CallData instance to serve new clients while we process
      // the one for this CallData. The instance will deallocate itself as
      // part of its FINISH state.
      new CallData(service_, cq_);

      // The actual processing.
      std::string prefix("Hello ");
      reply_.set_message(prefix + request_.name());

      // And we are done! Let the gRPC runtime know we've finished, using the
      // memory address of this instance as the uniquely identifying tag for
      // the event.
      responder_.Finish(reply_, Status::OK, this);
      status_ = FINISH;
    } else {
      GPR_ASSERT(status_ == FINISH);
      // Once in the FINISH state, deallocate ourselves (CallData).
      delete this;
    }
  }
}

簡潔にするため、サーバーはすべてのイベントに1つの完了キューのみを使用し、HandleRpcsでメインループを実行してキューを照会します。

void HandleRpcs() {
  // Spawn a new CallData instance to serve new clients.
  new CallData(&service_, cq_.get());
  void* tag;  // uniquely identifies a request.
  bool ok;
  while (true) {
    // Block waiting to read the next event from the completion queue. The
    // event is uniquely identified by its tag, which in this case is the
    // memory address of a CallData instance.
    cq_->Next(&tag, &ok);
    GPR_ASSERT(ok);
    static_cast<CallData*>(tag)->Proceed();
  }
}

サーバーのシャットダウン

非同期通知を取得するために完了キューを使用してきました。サーバーがシャットダウンされた *後* にも、完了キューをシャットダウンする必要があります。

cq_ = builder.AddCompletionQueue()を実行することで、ServerImpl::Run()で完了キューインスタンスcq_を取得しました。ServerBuilder::AddCompletionQueueのドキュメントを見ると、

…呼び出し元は、返された完了キューをシャットダウンする前にサーバーをシャットダウンする必要があります。

詳細については、ServerBuilder::AddCompletionQueueの完全なdocstringを参照してください。この例では、ServerImplのデストラクタは次のようになります。

~ServerImpl() {
  server_->Shutdown();
  // Always shutdown the completion queue after the server.
  cq_->Shutdown();
}

完全なサーバーの例は、greeter_async_server.ccで確認できます。