非同期 API チュートリアル

非同期 API チュートリアル

このチュートリアルでは、gRPC の非同期/ノンブロッキング API を使用して、簡単なサーバーとクライアントを C++ で記述する方法を示します。「基本チュートリアル」で説明されているように、単純な同期 gRPC コードの記述にすでに慣れていることを前提としています。このチュートリアルで使用されている例は、「クイックスタート」で使用されている基本的な「Greeter の例」に従っています。これは、`grpc/examples/cpp/helloworld` で、インストール手順とともに見つけることができます。

概要

gRPC は、非同期操作に CompletionQueue API を使用します。基本的なワークフローは次のとおりです。

  • RPC 呼び出しに CompletionQueue をバインドする
  • 読み取りまたは書き込みのようなことを行い、一意の void* タグを提示する
  • CompletionQueue::Next を呼び出して、操作の完了を待機します。タグが表示された場合、対応する操作が完了したことを示します。

非同期クライアント

リモートメソッドを呼び出す非同期クライアントを使用するには、同期クライアント と同様に、まずチャネルとスタブを作成します。スタブを取得したら、非同期呼び出しを行うには次の手順を実行します。

  • RPC を開始し、そのハンドルを作成します。RPC を CompletionQueue にバインドします。

    CompletionQueue cq;
    std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
        stub_->AsyncSayHello(&context, request, &cq));
    
  • 返信と最終ステータスを、一意のタグとともに要求します。

    Status status;
    rpc->Finish(&reply, &status, (void*)1);
    
  • 完了キューが次のタグを返すのを待ちます。返信とステータスは、対応する Finish() 呼び出しに渡されたタグが返されると利用可能になります。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // check reply and status
    }
    

完全なクライアントの例は、`greeter_async_client.cc` にあります。

非同期サーバー

サーバーの実装は、タグ付きの RPC 呼び出しを要求し、完了キューがタグを返すのを待ちます。非同期で RPC を処理するための基本的なフローは次のとおりです。

  • 非同期サービスをエクスポートするサーバーを構築する

    helloworld::Greeter::AsyncService service;
    ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
    builder.RegisterService(&service);
    auto cq = builder.AddCompletionQueue();
    auto server = builder.BuildAndStart();
    
  • RPC を 1 つ要求し、一意のタグを指定します。

    ServerContext context;
    HelloRequest request;
    ServerAsyncResponseWriter<HelloReply> responder;
    service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
    
  • 完了キューがタグを返すのを待ちます。タグが取得されると、コンテキスト、リクエスト、およびレスポンダーが利用可能になります。

    HelloReply reply;
    Status status;
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
      // set reply and status
      responder.Finish(reply, status, (void*)2);
    }
    
  • 完了キューがタグを返すのを待ちます。タグが戻ると、RPC は完了します。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)2) {
      // clean up
    }
    

ただし、この基本的なフローでは、サーバーが複数のリクエストを同時に処理することを考慮していません。これに対処するために、完全な非同期サーバーの例では、各 RPC の状態を維持するために CallData オブジェクトを使用し、このオブジェクトのアドレスを呼び出しの一意のタグとして使用します。

class CallData {
public:
  // Take in the "service" instance (in this case representing an asynchronous
  // server) and the completion queue "cq" used for asynchronous communication
  // with the gRPC runtime.
  CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
      : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
    // Invoke the serving logic right away.
    Proceed();
  }

  void Proceed() {
    if (status_ == CREATE) {
      // As part of the initial CREATE state, we *request* that the system
      // start processing SayHello requests. In this request, "this" acts are
      // the tag uniquely identifying the request (so that different CallData
      // instances can serve different requests concurrently), in this case
      // the memory address of this CallData instance.
      service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                this);
      // Make this instance progress to the PROCESS state.
      status_ = PROCESS;
    } else if (status_ == PROCESS) {
      // Spawn a new CallData instance to serve new clients while we process
      // the one for this CallData. The instance will deallocate itself as
      // part of its FINISH state.
      new CallData(service_, cq_);

      // The actual processing.
      std::string prefix("Hello ");
      reply_.set_message(prefix + request_.name());

      // And we are done! Let the gRPC runtime know we've finished, using the
      // memory address of this instance as the uniquely identifying tag for
      // the event.
      responder_.Finish(reply_, Status::OK, this);
      status_ = FINISH;
    } else {
      GPR_ASSERT(status_ == FINISH);
      // Once in the FINISH state, deallocate ourselves (CallData).
      delete this;
    }
  }
}

簡単にするために、サーバーはすべてのイベントに対して 1 つの完了キューのみを使用し、HandleRpcs でキューをクエリするメインループを実行します。

void HandleRpcs() {
  // Spawn a new CallData instance to serve new clients.
  new CallData(&service_, cq_.get());
  void* tag;  // uniquely identifies a request.
  bool ok;
  while (true) {
    // Block waiting to read the next event from the completion queue. The
    // event is uniquely identified by its tag, which in this case is the
    // memory address of a CallData instance.
    cq_->Next(&tag, &ok);
    GPR_ASSERT(ok);
    static_cast<CallData*>(tag)->Proceed();
  }
}

サーバーのシャットダウン

非同期通知を取得するために完了キューを使用しています。サーバーがシャットダウンされた*後に*シャットダウンするように注意する必要があります。

ServerImpl::Run() で、cq_ = builder.AddCompletionQueue() を実行して完了キューインスタンス cq_ を取得したことを思い出してください。ServerBuilder::AddCompletionQueue のドキュメントを見ると、次のことがわかります。

… 呼び出し元は、返された完了キューをシャットダウンする前に、サーバーをシャットダウンする必要があります。

詳細については、ServerBuilder::AddCompletionQueue の完全な docstring を参照してください。この例では、これは ServerImpl のデストラクタが次のようになることを意味します。

~ServerImpl() {
  server_->Shutdown();
  // Always shutdown the completion queue after the server.
  cq_->Shutdown();
}

完全なサーバーの例は、`greeter_async_server.cc` にあります。