非同期コールバックAPIチュートリアル

非同期コールバックAPIチュートリアル

このチュートリアルでは、gRPC の非同期コールバック API を使用して、C++ で簡単なサーバーとクライアントを作成する方法を示します。このチュートリアルで使用する例は、RouteGuide の例 に従っています。

概要

gRPC C++ は、同期 API と非同期 API の 2 種類の API を提供します。より具体的には、非同期 API には 2 種類あります。古い方は完了キューベース、新しい方はコールバックベースで、使いやすいです。このチュートリアルでは、コールバックベースの非同期 API(短くコールバック API)に焦点を当てます。コールバック API を使用して、以下の種類の RPC のサーバーとクライアントを実装する方法を学びます。

  • 単項RPC
  • サーバーサイドストリーミングRPC
  • クライアントサイドストリーミングRPC
  • 双方向ストリーミングRPC

サンプルコード

このチュートリアルでは、ルート案内アプリケーションを作成します。クライアントは、ルート上の機能に関する情報を取得したり、ルートの概要を作成したり、サーバーおよび他のクライアントと交通情報などのルート情報を交換したりできます。

以下は、Protocol Buffers で定義されたサービスインターフェースです。

// Interface exported by the server.
service RouteGuide {
  // A simple RPC.
  //
  // Obtains the feature at a given position.
  //
  // A feature with an empty name is returned if there's no feature at the given
  // position.
  rpc GetFeature(Point) returns (Feature) {}

  // A server-to-client streaming RPC.
  //
  // Obtains the Features available within the given Rectangle.  Results are
  // streamed rather than returned at once (e.g. in a response message with a
  // repeated field), as the rectangle may cover a large area and contain a
  // huge number of features.
  rpc ListFeatures(Rectangle) returns (stream Feature) {}

  // A client-to-server streaming RPC.
  //
  // Accepts a stream of Points on a route being traversed, returning a
  // RouteSummary when traversal is completed.
  rpc RecordRoute(stream Point) returns (RouteSummary) {}

  // A Bidirectional streaming RPC.
  //
  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}

同じ例が、同期 API を使用しても実装されています。興味のある方は、両方の実装を比較してみてください。

実装するサービス

コールバック API を使用してサービスを実装したいので、実装すべきサービスインターフェースは RouteGuide::CallbackService です。

class RouteGuideImpl final : public RouteGuide::CallbackService {
  ...
};

次のセクションの「サーバー側」のサブセクションですべての 4 つの RPC を実装します。

単項RPC

最も単純な RPC である GetFeature から始めましょう。これは単項 RPC です。GetFeature を介して、クライアントは Point をサーバーに送信し、サーバーはその後、その PointFeature をクライアントに返します。

サーバー

この RPC の実装は非常にシンプルでわかりやすいです。

  grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    feature->set_name(GetFeatureName(*point, feature_list_));
    feature->mutable_location()->CopyFrom(*point);
    auto* reactor = context->DefaultReactor();
    reactor->Finish(Status::OK);
    return reactor;
  }

Feature の出力フィールドを設定した後、最終的なステータスを ServerUnaryReactor 経由で返します。

カスタム単項リアクター

上記の例では、デフォルトのリアクターを使用しています。RPC のキャンセルを処理したり、RPC が完了したときに非同期でアクションを実行したりしたい場合は、カスタムリアクターを使用することもできます。以下の例では、両方のアクションにログを追加します。

  grpc::ServerUnaryReactor* GetFeature(grpc::CallbackServerContext* context,
                                       const Point* point,
                                       Feature* feature) override {
    class Reactor : public grpc::ServerUnaryReactor {
     public:
      Reactor(const Point& point, const std::vector<Feature>& feature_list,
              Feature* feature) {
        feature->set_name(GetFeatureName(point, feature_list));
        *feature->mutable_location() = point;
        Finish(grpc::Status::OK);
      }

     private:
      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
    };
    return new Reactor(*point, feature_list_, feature);
  }

ServerUnaryReactor の場合、OnDone() をオーバーライドし、オプションで OnCancel() をオーバーライドする必要があります。

注意 コールバックメソッド(例: OnDone())は迅速に返す必要があります。このようなコールバックでブロッキング作業(例: イベントの待機)を実行しないでください。

ServerUnaryReactor のコンストラクタは、GetFeature() が RPC を開始した応答としてリアクターを構築して提供する際に呼び出されます。リクエスト Point、レスポンス Feature、および feature_list を収集します。その後、Point からレスポンス Feature を取得し、feature_list に追加します。RPC を終了するために Finish(Status::OK) を呼び出します。

OnDone() は RPC の完了に応答します。OnDone() で最終的なクリーンアップを行い、RPC の終了をログに記録します。

OnCancel() は RPC のキャンセルに応答します。ここでは、このメソッドでキャンセルが発生したことをログに記録します。

クライアント

注意: 簡単にするために、このチュートリアルではチャネルとスタブの作成方法については説明しません。詳細については、基本チュートリアルを参照してください。

GetFeature RPC を開始するには、ClientContext、リクエスト(つまり Point)、およびレスポンス(つまり Feature)に加えて、クライアントはコールバック(つまり std::function<void(::grpc::Status)>)を stub_->async()->GetFeature() に渡す必要があります。コールバックは、サーバーがリクエストを処理し、RPC が完了した後に呼び出されます。

  bool GetOneFeature(const Point& point, Feature* feature) {
    ClientContext context;
    bool result;
    std::mutex mu;
    std::condition_variable cv;
    bool done = false;
    stub_->async()->GetFeature(
        &context, &point, feature,
        [&result, &mu, &cv, &done, feature, this](Status status) {
          bool ret;
          if (!status.ok()) {
            std::cout << "GetFeature rpc failed." << std::endl;
            ret = false;
          } else if (!feature->has_location()) {
            std::cout << "Server returns incomplete feature." << std::endl;
            ret = false;
          } else if (feature->name().empty()) {
            std::cout << "Found no feature at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          } else {
            std::cout << "Found feature called " << feature->name() << " at "
                      << feature->location().latitude() / kCoordFactor_ << ", "
                      << feature->location().longitude() / kCoordFactor_
                      << std::endl;
            ret = true;
          }
          std::lock_guard<std::mutex> lock(mu);
          result = ret;
          done = true;
          cv.notify_one();
        });
    std::unique_lock<std::mutex> lock(mu);
    cv.wait(lock, [&done] { return done; });
    return result;
  }

コールバックは、単項 RPC のさまざまな後続作業を実行できます。たとえば、上記のコード スニペットのコールバックは、ステータスと返されたフィーチャーをチェックし、この呼び出しのためのヒープ割り当てオブジェクトを解放し、最後に RPC が完了したことを通知します。

簡単にするために、例では RPC の完了通知を待機する同じ関数を示していますが、これは必須ではありません。

サーバーサイドストリーミングRPC

次に、より複雑な RPC である ListFeatures を見てみましょう。ListFeatures はサーバーサイドストリーミング RPC です。クライアントは Rectangle をサーバーに送信し、サーバーはクライアントに Feature のシーケンスを返します。各 Feature は個別のメッセージで送信されます。

サーバー

サーバーサイドストリーミング RPC を含む、すべてのストリーミング RPC で、RPC ハンドラのインターフェースは似ています。ハンドラには入力パラメータはなく、戻り値の型は、1 つの RPC のすべてのビジネスロジックを処理するサーバーリアクターのいずれかの種類です。

以下は、ListFeatures のハンドラインターフェースです。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle);

ListFeatures はサーバーストリーミング RPC であるため、戻り値の型は ServerWriteReactor である必要があります。ServerWriteReactor には 2 つのテンプレートパラメータがあります。Rectangle はクライアントからのリクエストの型、Feature はサーバーからの各レスポンスメッセージの型です。

RPC の処理の複雑さは ServerWriteReactor に委譲されます。以下は、ListFeatures RPC を処理するために ServerWriteReactor を実装する方法です。

  grpc::ServerWriteReactor<Feature>* ListFeatures(
      CallbackServerContext* context,
      const routeguide::Rectangle* rectangle) override {
    class Lister : public grpc::ServerWriteReactor<Feature> {
     public:
      Lister(const routeguide::Rectangle* rectangle,
             const std::vector<Feature>* feature_list)
          : left_((std::min)(rectangle->lo().longitude(),
                             rectangle->hi().longitude())),
            right_((std::max)(rectangle->lo().longitude(),
                              rectangle->hi().longitude())),
            top_((std::max)(rectangle->lo().latitude(),
                            rectangle->hi().latitude())),
            bottom_((std::min)(rectangle->lo().latitude(),
                               rectangle->hi().latitude())),
            feature_list_(feature_list),
            next_feature_(feature_list_->begin()) {
        NextWrite();
      }

      void OnWriteDone(bool ok) override {
        if (!ok) {
          Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
        }
        NextWrite();
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        while (next_feature_ != feature_list_->end()) {
          const Feature& f = *next_feature_;
          next_feature_++;
          if (f.location().longitude() >= left_ &&
              f.location().longitude() <= right_ &&
              f.location().latitude() >= bottom_ &&
              f.location().latitude() <= top_) {
            StartWrite(&f);
            return;
          }
        }
        // Didn't write anything, all is done.
        Finish(Status::OK);
      }
      const long left_;
      const long right_;
      const long top_;
      const long bottom_;
      const std::vector<Feature>* feature_list_;
      std::vector<Feature>::const_iterator next_feature_;
    };
    return new Lister(rectangle, &feature_list_);
  }

異なるリアクターは異なるコールバックメソッドを持っています。RPC を実装するために、関心のあるメソッドをオーバーライドする必要があります。ListFeatures の場合、OnWriteDone()OnDone()、およびオプションで OnCancel() をオーバーライドする必要があります。

ServerWriteReactor のコンストラクタは、ListFeatures() が RPC を開始した応答としてリアクターを構築して提供する際に呼び出されます。rectangle 内のすべての Featurefeatures_to_send_ に収集し、それらが存在する場合は送信を開始します。

OnWriteDone() は書き込み完了に応答します。書き込みが正常に完了した場合、features_to_send_ が空になるまで次の Feature の送信を続行し、その後 Finish(Status::OK) を呼び出してコールを完了します。

OnDone() は RPC の完了に応答します。OnDone() で最終的なクリーンアップを行います。

OnCancel() は RPC のキャンセルに応答します。このメソッドでキャンセルが発生したことをログに記録します。

クライアント

サーバー側と同様に、クライアント側でも何らかのクライアントリアクターを実装する必要があります。クライアントリアクターは、RPC の処理に必要なすべての操作をカプセル化します。

ListFeatures はサーバーストリーミングであるため、ClientReadReactor を実装する必要があります。これは ServerWriteReactor と対称的な名前です。

    class Reader : public grpc::ClientReadReactor<Feature> {
     public:
      Reader(RouteGuide::Stub* stub, float coord_factor,
             const routeguide::Rectangle& rect)
          : coord_factor_(coord_factor) {
        stub->async()->ListFeatures(&context_, &rect, this);
        StartRead(&feature_);
        StartCall();
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Found feature called " << feature_.name() << " at "
                    << feature_.location().latitude() / coord_factor_ << ", "
                    << feature_.location().longitude() / coord_factor_
                    << std::endl;
          StartRead(&feature_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      ClientContext context_;
      float coord_factor_;
      Feature feature_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientReadReactor は 1 つのパラメータ、つまりサーバーからのレスポンスメッセージの型である Feature でテンプレート化されています。

Reader のコンストラクタでは、ClientContext&rectangle_(リクエストオブジェクト)、および Reader を RPC メソッド stub->async()->ListFeatures() に渡します。次に、&feature_StartRead() に渡して、受信したレスポンスを格納する場所を指定します。最後に、StartCall() を呼び出して RPC をアクティブ化します。

OnReadDone() は読み込み完了に応答します。読み込みが正常に完了した場合(oktrue の場合)、次の Feature の読み込みを続行します。そうでない場合は、読み込みが失敗したことを示します。

OnDone() は RPC の完了に応答します。RPC ステータスの結果を確認し、OnDone() を待機している条件変数に通知します。

Await()ClientReadReactor のメソッドではありません。これは簡単にするために追加されたもので、例が RPC が完了したことを認識できるようにするためです。代替案として、完了通知の必要がなければ、OnDone() はクリーンアップ後に単純に復帰できます。たとえば、ヒープ割り当てオブジェクトを解放するなどです。

RPC を開始するには、クライアントは単に ReadReactor をインスタンス化し、RPC の完了を待機します。

    routeguide::Rectangle rect;
    Feature feature;

    rect.mutable_lo()->set_latitude(400000000);
    rect.mutable_lo()->set_longitude(-750000000);
    rect.mutable_hi()->set_latitude(420000000);
    rect.mutable_hi()->set_longitude(-730000000);
    std::cout << "Looking for features between 40, -75 and 42, -73"
              << std::endl;

    Reader reader(stub_.get(), kCoordFactor_, rect);
    Status status = reader.Await();
    if (status.ok()) {
      std::cout << "ListFeatures rpc succeeded." << std::endl;
    } else {
      std::cout << "ListFeatures rpc failed." << std::endl;
    }

クライアントサイドストリーミングRPC

前のセクションでサーバーサイドストリーミング RPC のアイデアを理解したら、クライアントサイドストリーミング RPC も簡単に学べるはずです。

RecordRoute は、ここで説明するクライアントサイドストリーミング RPC です。クライアントは Point のシーケンスをサーバーに送信し、クライアントが Point の送信を完了した後、サーバーは RouteSummary を返します。

サーバー

クライアントサイドストリーミング RPC の RPC ハンドラのインターフェースには入力パラメータはなく、戻り値の型は ServerReadReactor というサーバーリアクターです。

ServerReadReactor には 2 つのテンプレートパラメータがあります。Point はクライアントからの各リクエストメッセージの型、RouteSummary はサーバーからのレスポンスの型です。

ServerWriteReactor と同様に、ServerReadReactor は RPC を処理するクラスです。

grpc::ServerReadReactor<Point>* RecordRoute(CallbackServerContext* context,
                                              RouteSummary* summary) override {
    class Recorder : public grpc::ServerReadReactor<Point> {
     public:
      Recorder(RouteSummary* summary, const std::vector<Feature>* feature_list)
          : start_time_(system_clock::now()),
            summary_(summary),
            feature_list_(feature_list) {
        StartRead(&point_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          point_count_++;
          if (!GetFeatureName(point_, *feature_list_).empty()) {
            feature_count_++;
          }
          if (point_count_ != 1) {
            distance_ += GetDistance(previous_, point_);
          }
          previous_ = point_;
          StartRead(&point_);
        } else {
          summary_->set_point_count(point_count_);
          summary_->set_feature_count(feature_count_);
          summary_->set_distance(static_cast<long>(distance_));
          auto secs = std::chrono::duration_cast<std::chrono::seconds>(
              system_clock::now() - start_time_);
          summary_->set_elapsed_time(secs.count());
          Finish(Status::OK);
        }
      }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      system_clock::time_point start_time_;
      RouteSummary* summary_;
      const std::vector<Feature>* feature_list_;
      Point point_;
      int point_count_ = 0;
      int feature_count_ = 0;
      float distance_ = 0.0;
      Point previous_;
    };
    return new Recorder(summary, &feature_list_);
  }

ServerReadReactor のコンストラクタは、RecordRoute() が RPC を開始した応答としてリアクターを構築して提供する際に呼び出されます。コンストラクタは、後でレスポンスを返すために RouteSummary* を格納し、StartRead(&point_) を呼び出して読み込み操作を開始します。

OnReadDone() は読み込み完了に応答します。読み込みが正常に完了した場合(oktrue の場合)、新しい Point で統計情報を更新し、次の Point の読み込みを続行します。読み込みに失敗した場合(okfalse)、サーバーはレスポンスを summary_ に設定し、Finish(Status::OK) を呼び出して RPC を完了します。

OnDone() は RPC の完了に応答します。OnDone() で最終的なクリーンアップを行います。

OnCancel() は RPC のキャンセルに応答します。このメソッドでキャンセルが発生したことをログに記録します。

クライアント

予想通り、クライアント側ではクライアントリアクターを実装する必要があります。そのクライアントリアクターは ClientWriteReactor と呼ばれます。

    class Recorder : public grpc::ClientWriteReactor<Point> {
     public:
      Recorder(RouteGuide::Stub* stub, float coord_factor,
               const std::vector<Feature>* feature_list)
          : coord_factor_(coord_factor),
            feature_list_(feature_list),
            generator_(
                std::chrono::system_clock::now().time_since_epoch().count()),
            feature_distribution_(0, feature_list->size() - 1),
            delay_distribution_(500, 1500) {
        stub->async()->RecordRoute(&context_, &stats_, this);
        // Use a hold since some StartWrites are invoked indirectly from a
        // delayed lambda in OnWriteDone rather than directly from the reaction
        // itself
        AddHold();
        NextWrite();
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        // Delay and then do the next write or WritesDone
        alarm_.Set(
            std::chrono::system_clock::now() +
                std::chrono::milliseconds(delay_distribution_(generator_)),
            [this](bool /*ok*/) { NextWrite(); });
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await(RouteSummary* stats) {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        *stats = stats_;
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (points_remaining_ != 0) {
          const Feature& f =
              (*feature_list_)[feature_distribution_(generator_)];
          std::cout << "Visiting point "
                    << f.location().latitude() / coord_factor_ << ", "
                    << f.location().longitude() / coord_factor_ << std::endl;
          StartWrite(&f.location());
          points_remaining_--;
        } else {
          StartWritesDone();
          RemoveHold();
        }
      }
      ClientContext context_;
      float coord_factor_;
      int points_remaining_ = 10;
      Point point_;
      RouteSummary stats_;
      const std::vector<Feature>* feature_list_;
      std::default_random_engine generator_;
      std::uniform_int_distribution<int> feature_distribution_;
      std::uniform_int_distribution<int> delay_distribution_;
      grpc::Alarm alarm_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientWriteReactor は、クライアントからのリクエストメッセージの型である 1 つのパラメータでテンプレート化されています。

Recorder のコンストラクタでは、ClientContext&stats_(レスポンスオブジェクト)、および Recorder を RPC メソッド stub->async()->RecordRoute() に渡します。次に、points_to_send_ にある最初の Point を送信する操作を追加します。RPC 開始時に送信するものが何もない場合は、StartWritesDone() を呼び出してサーバーに書き込みが完了したことを通知する必要があることに注意してください。最後に、StartCall() を呼び出して RPC をアクティブ化します。

OnWriteDone() は書き込み完了に応答します。書き込みが正常に完了した場合、points_to_send_ が空になるまで次の Point の書き込みを続行します。送信する最後の Point の場合、StartWriteLast() を呼び出して書き込み完了のシグナルをバンドルします。StartWriteLast() は、StartWrite()StartWritesDone() を組み合わせたものと実質的に同じですが、より効率的です。

OnDone() は RPC の完了に応答します。RPC ステータスの結果と stats_ のレスポンスを確認し、OnDone() を待機している条件変数に通知します。

Await()ClientWriteReactor のメソッドではありません。Await() を追加したのは、呼び出し元が RPC が完了するまで待機できるようにするためです。

RPC を開始するには、クライアントは単に Recorder をインスタンス化し、RPC の完了を待機します。

    Recorder recorder(stub_.get(), kCoordFactor_, &feature_list_);
    RouteSummary stats;
    Status status = recorder.Await(&stats);
    if (status.ok()) {
      std::cout << "Finished trip with " << stats.point_count() << " points\n"
                << "Passed " << stats.feature_count() << " features\n"
                << "Travelled " << stats.distance() << " meters\n"
                << "It took " << stats.elapsed_time() << " seconds"
                << std::endl;
    } else {
      std::cout << "RecordRoute rpc failed." << std::endl;
    }

双方向ストリーミングRPC

最後に、双方向ストリーミング RPC である RouteChat を見てみましょう。この場合、クライアントは RouteNote のシーケンスをサーバーに送信します。Point 上の RouteNote が送信されるたびに、サーバーは、それ以前にすべてのクライアントが送信した同じ Point 上の RouteNote のシーケンスを返します。

サーバー

ここでも、双方向ストリーミング RPC の RPC ハンドラのインターフェースには入力パラメータはなく、戻り値の型は ServerBidiReactor というサーバーリアクターです。

ServerBidiReactor には 2 つのテンプレートパラメータがあります。どちらも RouteNote です。これは、RouteNote がリクエストとレスポンスの両方のメッセージ型であるためです。結局、RouteChat はクライアント同士がチャットしたり、情報を共有したりできるようにすることを意味します。

ServerWriteReactorServerReadReactor についてはすでに説明したので、ServerBidiReactor も非常にわかりやすいはずです。

  grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
      CallbackServerContext* context) override {
    class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
     public:
      Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
          : mu_(mu), received_notes_(received_notes) {
        StartRead(&note_);
      }

      void OnReadDone(bool ok) override {
        if (ok) {
          // Unlike the other example in this directory that's not using
          // the reactor pattern, we can't grab a local lock to secure the
          // access to the notes vector, because the reactor will most likely
          // make us jump threads, so we'll have to use a different locking
          // strategy. We'll grab the lock locally to build a copy of the
          // list of nodes we're going to send, then we'll grab the lock
          // again to append the received note to the existing vector.
          mu_->Lock();
          std::copy_if(received_notes_->begin(), received_notes_->end(),
                       std::back_inserter(to_send_notes_),
                       [this](const RouteNote& note) {
                         return note.location().latitude() ==
                                    note_.location().latitude() &&
                                note.location().longitude() ==
                                    note_.location().longitude();
                       });
          mu_->Unlock();
          notes_iterator_ = to_send_notes_.begin();
          NextWrite();
        } else {
          Finish(Status::OK);
        }
      }
      void OnWriteDone(bool /*ok*/) override { NextWrite(); }

      void OnDone() override {
        LOG(INFO) << "RPC Completed";
        delete this;
      }

      void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }

     private:
      void NextWrite() {
        if (notes_iterator_ != to_send_notes_.end()) {
          StartWrite(&*notes_iterator_);
          notes_iterator_++;
        } else {
          mu_->Lock();
          received_notes_->push_back(note_);
          mu_->Unlock();
          StartRead(&note_);
        }
      }

      RouteNote note_;
      absl::Mutex* mu_;
      std::vector<RouteNote>* received_notes_ ABSL_GUARDED_BY(mu_);
      std::vector<RouteNote> to_send_notes_;
      std::vector<RouteNote>::iterator notes_iterator_;
    };
    return new Chatter(&mu_, &received_notes_);
  }

ServerBidiReactor のコンストラクタは、RouteChat() が RPC を開始した応答としてリアクターを構築して提供する際に呼び出されます。コンストラクタは StartRead(&received_note_) を呼び出して読み込み操作を開始します。

OnReadDone() は読み込み完了に応答します。読み込みが正常に完了した場合(つまり oktrue の場合)、次の RouteNote の読み込みを続行します。そうでない場合は、読み込みがすべて完了したことを記録し、RPC を完了します。成功した読み込みで新しく受信した RouteNote については、received_notes_ に追加し、同じ Point 上で以前に受信したノートを to_send_notes_ に追加します。to_send_notes_ が空でなくなった場合は、to_send_notes_ 内の RouteNote を送信し始めます。

OnWriteDone() は書き込み完了に応答します。書き込みが正常に完了した場合、to_send_notes_ が空になるまで次の RouteNote の送信を続行します。その時点で、次の RouteNote の読み込みを続行するか、読み込みも完了している場合は RPC を完了します。

OnDone() は RPC の完了に応答します。OnDone() で最終的なクリーンアップを行います。

OnCancel() は RPC のキャンセルに応答します。このメソッドでキャンセルが発生したことをログに記録します。

クライアント

はい、双方向ストリーミング RPC のクライアントリアクターは ClientBidiReactor です。

    class Chatter : public grpc::ClientBidiReactor<RouteNote, RouteNote> {
     public:
      explicit Chatter(RouteGuide::Stub* stub)
          : notes_{MakeRouteNote("First message", 0, 0),
                   MakeRouteNote("Second message", 0, 1),
                   MakeRouteNote("Third message", 1, 0),
                   MakeRouteNote("Fourth message", 0, 0)},
            notes_iterator_(notes_.begin()) {
        stub->async()->RouteChat(&context_, this);
        NextWrite();
        StartRead(&server_note_);
        StartCall();
      }
      void OnWriteDone(bool ok) override {
        if (ok) {
          NextWrite();
        }
      }
      void OnReadDone(bool ok) override {
        if (ok) {
          std::cout << "Got message " << server_note_.message() << " at "
                    << server_note_.location().latitude() << ", "
                    << server_note_.location().longitude() << std::endl;
          StartRead(&server_note_);
        }
      }
      void OnDone(const Status& s) override {
        std::unique_lock<std::mutex> l(mu_);
        status_ = s;
        done_ = true;
        cv_.notify_one();
      }
      Status Await() {
        std::unique_lock<std::mutex> l(mu_);
        cv_.wait(l, [this] { return done_; });
        return std::move(status_);
      }

     private:
      void NextWrite() {
        if (notes_iterator_ != notes_.end()) {
          const auto& note = *notes_iterator_;
          std::cout << "Sending message " << note.message() << " at "
                    << note.location().latitude() << ", "
                    << note.location().longitude() << std::endl;
          StartWrite(&note);
          notes_iterator_++;
        } else {
          StartWritesDone();
        }
      }
      ClientContext context_;
      const std::vector<RouteNote> notes_;
      std::vector<RouteNote>::const_iterator notes_iterator_;
      RouteNote server_note_;
      std::mutex mu_;
      std::condition_variable cv_;
      Status status_;
      bool done_ = false;
    };

ClientBidiReactor は、リクエストとレスポンスのメッセージ型である 2 つのパラメータでテンプレート化されており、RPC RouteChat の場合はどちらも RouteNote です。

Chatter のコンストラクタでは、ClientContextChatter を RPC メソッド stub->async()->RouteChat() に渡します。次に、notes_ にある最初の RouteNote を送信する操作を追加します。RPC 開始時に送信するものが何もない場合は、StartWritesDone() を呼び出してサーバーに書き込みが完了したことを通知する必要があることに注意してください。また、StartRead() を呼び出して読み込み操作を追加します。最後に、StartCall() を呼び出して RPC をアクティブ化します。

OnReadDone() は読み込み完了に応答します。読み込みが正常に完了した場合(oktrue の場合)、次の RouteNote の読み込みを続行します。そうでない場合は、読み込みが失敗したことを示します。

OnWriteDone() は書き込み完了に応答します。書き込みが正常に完了した場合、notes_ が空になるまで次の RouteNote の書き込みを続行します。送信する最後の RouteNote の場合、StartWriteLast() を呼び出して書き込み完了のシグナルをバンドルします。StartWriteLast() は、StartWrite()StartWritesDone() を組み合わせたものと実質的に同じですが、より効率的です。

OnDone() は RPC の完了に応答します。RPC ステータスの結果とメッセージ統計を確認し、OnDone() を待機している条件変数に通知します。

Await()ClientBidiReactor のメソッドではありません。Await() を追加したのは、呼び出し元が RPC が完了するまで待機できるようにするためです。

RPC を開始するには、クライアントは単に Chatter をインスタンス化し、RPC の完了を待機します。

    Chatter chatter(stub_.get());
    Status status = chatter.Await();
    if (!status.ok()) {
      std::cout << "RouteChat rpc failed." << std::endl;
    }