From 05c59bc82554ef37f2e083eaeb0fb7421f07a489 Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Thu, 3 Oct 2024 12:17:54 -0400 Subject: [PATCH 1/2] Refactor stream to notify first and next --- Firestore/core/src/remote/stream.cc | 6 +- Firestore/core/src/remote/stream.h | 5 +- Firestore/core/src/remote/watch_stream.cc | 6 +- Firestore/core/src/remote/watch_stream.h | 5 +- Firestore/core/src/remote/write_stream.cc | 62 ++++++++++++------- Firestore/core/src/remote/write_stream.h | 7 ++- .../core/test/unit/remote/stream_test.cc | 23 +++++-- 7 files changed, 82 insertions(+), 32 deletions(-) diff --git a/Firestore/core/src/remote/stream.cc b/Firestore/core/src/remote/stream.cc index ee6774fbf8a..d9b85899770 100644 --- a/Firestore/core/src/remote/stream.cc +++ b/Firestore/core/src/remote/stream.cc @@ -94,6 +94,7 @@ bool Stream::IsStarted() const { void Stream::Start() { EnsureOnQueue(); + response_count_ = 0; if (state_ == State::Error) { BackoffAndTryRestarting(); @@ -258,7 +259,10 @@ void Stream::OnStreamRead(const grpc::ByteBuffer& message) { grpc_stream_->GetResponseHeaders())); } - Status read_status = NotifyStreamResponse(message); + Status read_status = (++response_count_ == 1) + ? NotifyFirstStreamResponse(message) + : NotifyNextStreamResponse(message); + if (!read_status.ok()) { grpc_stream_->FinishImmediately(); // Don't expect gRPC to produce status -- since the error happened on the diff --git a/Firestore/core/src/remote/stream.h b/Firestore/core/src/remote/stream.h index d9c4a6465f4..c5e305b57b3 100644 --- a/Firestore/core/src/remote/stream.h +++ b/Firestore/core/src/remote/stream.h @@ -226,7 +226,9 @@ class Stream : public GrpcStreamObserver, const std::string& app_check_token) = 0; virtual void TearDown(GrpcStream* stream) = 0; virtual void NotifyStreamOpen() = 0; - virtual util::Status NotifyStreamResponse( + virtual util::Status NotifyFirstStreamResponse( + const grpc::ByteBuffer& message) = 0; + virtual util::Status NotifyNextStreamResponse( const grpc::ByteBuffer& message) = 0; virtual void NotifyStreamClose(const util::Status& status) = 0; // PORTING NOTE: C++ cannot rely on RTTI, unlike other platforms. @@ -260,6 +262,7 @@ class Stream : public GrpcStreamObserver, // Used to prevent auth if the stream happens to be restarted before token is // received. int close_count_ = 0; + int response_count_ = 0; }; } // namespace remote diff --git a/Firestore/core/src/remote/watch_stream.cc b/Firestore/core/src/remote/watch_stream.cc index 61f158554b8..37b372696bb 100644 --- a/Firestore/core/src/remote/watch_stream.cc +++ b/Firestore/core/src/remote/watch_stream.cc @@ -92,7 +92,11 @@ void WatchStream::NotifyStreamOpen() { callback_->OnWatchStreamOpen(); } -Status WatchStream::NotifyStreamResponse(const grpc::ByteBuffer& message) { +Status WatchStream::NotifyFirstStreamResponse(const grpc::ByteBuffer& message) { + return NotifyNextStreamResponse(message); +} + +Status WatchStream::NotifyNextStreamResponse(const grpc::ByteBuffer& message) { ByteBufferReader reader{message}; auto response = watch_serializer_.ParseResponse(&reader); if (!reader.ok()) { diff --git a/Firestore/core/src/remote/watch_stream.h b/Firestore/core/src/remote/watch_stream.h index 02de5f85e78..be761fbd773 100644 --- a/Firestore/core/src/remote/watch_stream.h +++ b/Firestore/core/src/remote/watch_stream.h @@ -116,7 +116,10 @@ class WatchStream : public Stream { void TearDown(GrpcStream* grpc_stream) override; void NotifyStreamOpen() override; - util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override; + util::Status NotifyFirstStreamResponse( + const grpc::ByteBuffer& message) override; + util::Status NotifyNextStreamResponse( + const grpc::ByteBuffer& message) override; void NotifyStreamClose(const util::Status& status) override; std::string GetDebugName() const override { diff --git a/Firestore/core/src/remote/write_stream.cc b/Firestore/core/src/remote/write_stream.cc index d9b35da8e16..a0848f91ae6 100644 --- a/Firestore/core/src/remote/write_stream.cc +++ b/Firestore/core/src/remote/write_stream.cc @@ -68,6 +68,11 @@ const ByteString& WriteStream::last_stream_token() const { return last_stream_token_; } +void WriteStream::Start() { + handshake_complete_ = false; + Stream::Start(); +} + void WriteStream::WriteHandshake() { EnsureOnQueue(); HARD_ASSERT(IsOpen(), "Writing handshake requires an opened stream"); @@ -121,12 +126,9 @@ void WriteStream::NotifyStreamOpen() { void WriteStream::NotifyStreamClose(const Status& status) { callback_->OnWriteStreamClose(status); - // Delegate's logic might depend on whether handshake was completed, so only - // reset it after notifying. - handshake_complete_ = false; } -Status WriteStream::NotifyStreamResponse(const grpc::ByteBuffer& message) { +Status WriteStream::NotifyFirstStreamResponse(const grpc::ByteBuffer& message) { ByteBufferReader reader{message}; Message response = write_serializer_.ParseResponse(&reader); @@ -134,31 +136,47 @@ Status WriteStream::NotifyStreamResponse(const grpc::ByteBuffer& message) { return reader.status(); } - LOG_DEBUG("%s response: %s", GetDebugDescription(), response.ToString()); + LOG_DEBUG("%s first response: %s", GetDebugDescription(), + response.ToString()); // Always capture the last stream token. set_last_stream_token(ByteString::Take(response->stream_token)); response->stream_token = nullptr; - if (!handshake_complete()) { - // The first response is the handshake response - handshake_complete_ = true; - callback_->OnWriteStreamHandshakeComplete(); - } else { - // A successful first write response means the stream is healthy. - // Note that we could consider a successful handshake healthy, however, the - // write itself might be causing an error we want to back off from. - backoff_.Reset(); - - auto version = write_serializer_.DecodeCommitVersion(&reader, *response); - auto results = write_serializer_.DecodeMutationResults(&reader, *response); - if (!reader.ok()) { - return reader.status(); - } - - callback_->OnWriteStreamMutationResult(version, std::move(results)); + // The first response is the handshake response + handshake_complete_ = true; + callback_->OnWriteStreamHandshakeComplete(); + + return Status::OK(); +} + +Status WriteStream::NotifyNextStreamResponse(const grpc::ByteBuffer& message) { + ByteBufferReader reader{message}; + Message response = + write_serializer_.ParseResponse(&reader); + if (!reader.ok()) { + return reader.status(); + } + + LOG_DEBUG("%s next response: %s", GetDebugDescription(), response.ToString()); + + // Always capture the last stream token. + set_last_stream_token(ByteString::Take(response->stream_token)); + response->stream_token = nullptr; + + // A successful first write response means the stream is healthy. + // Note that we could consider a successful handshake healthy, however, the + // write itself might be causing an error we want to back off from. + backoff_.Reset(); + + auto version = write_serializer_.DecodeCommitVersion(&reader, *response); + auto results = write_serializer_.DecodeMutationResults(&reader, *response); + if (!reader.ok()) { + return reader.status(); } + callback_->OnWriteStreamMutationResult(version, std::move(results)); + return Status::OK(); } diff --git a/Firestore/core/src/remote/write_stream.h b/Firestore/core/src/remote/write_stream.h index cf01926b94b..97288c9c549 100644 --- a/Firestore/core/src/remote/write_stream.h +++ b/Firestore/core/src/remote/write_stream.h @@ -120,6 +120,8 @@ class WriteStream : public Stream { return handshake_complete_; } + void Start() override; + /** * Sends an initial stream token to the server, performing the handshake * required to make the StreamingWrite RPC work. @@ -143,7 +145,10 @@ class WriteStream : public Stream { void TearDown(GrpcStream* grpc_stream) override; void NotifyStreamOpen() override; - util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override; + util::Status NotifyFirstStreamResponse( + const grpc::ByteBuffer& message) override; + util::Status NotifyNextStreamResponse( + const grpc::ByteBuffer& message) override; void NotifyStreamClose(const util::Status& status) override; std::string GetDebugName() const override { diff --git a/Firestore/core/test/unit/remote/stream_test.cc b/Firestore/core/test/unit/remote/stream_test.cc index face4e70f78..b6fc85a4e88 100644 --- a/Firestore/core/test/unit/remote/stream_test.cc +++ b/Firestore/core/test/unit/remote/stream_test.cc @@ -108,14 +108,27 @@ class TestStream : public Stream { observed_states_.push_back("NotifyStreamOpen"); } - util::Status NotifyStreamResponse(const grpc::ByteBuffer& message) override { + util::Status NotifyFirstStreamResponse(const grpc::ByteBuffer& message) override { std::string str = ByteBufferToString(message); if (str.empty()) { - observed_states_.push_back("NotifyStreamResponse"); + observed_states_.push_back("NotifyFirstStreamResponse"); } else { - observed_states_.push_back(StringFormat("NotifyStreamResponse(%s)", str)); + observed_states_.push_back(StringFormat("NotifyFirstStreamResponse(%s)", str)); } + return ResolveStreamResponse(); + } + + util::Status NotifyNextStreamResponse(const grpc::ByteBuffer& message) override { + std::string str = ByteBufferToString(message); + if (str.empty()) { + observed_states_.push_back("NotifyNextStreamResponse"); + } else { + observed_states_.push_back(StringFormat("NotifyNextStreamResponse(%s)", str)); + } + return ResolveStreamResponse(); + } + util::Status ResolveStreamResponse() { if (fail_next_stream_read_) { fail_next_stream_read_ = false; // The parent stream will issue a finish operation and block until it's @@ -294,8 +307,8 @@ TEST_F(StreamTest, ObserverReceivesStreamRead) { EXPECT_TRUE(firestore_stream->IsStarted()); EXPECT_TRUE(firestore_stream->IsOpen()); EXPECT_EQ(observed_states(), - States({"NotifyStreamOpen", "NotifyStreamResponse(foo)", - "NotifyStreamResponse(bar)"})); + States({"NotifyStreamOpen", "NotifyFirstStreamResponse(foo)", + "NotifyNextStreamResponse(bar)"})); }); } From ab073f49a22accda342fbadcf9b9a411162ef02d Mon Sep 17 00:00:00 2001 From: Tom Andersen Date: Thu, 3 Oct 2024 12:29:18 -0400 Subject: [PATCH 2/2] Fix CI --- Firestore/CHANGELOG.md | 3 +++ Firestore/core/test/unit/remote/stream_test.cc | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/Firestore/CHANGELOG.md b/Firestore/CHANGELOG.md index 092e66027f8..dbac1cabf2d 100644 --- a/Firestore/CHANGELOG.md +++ b/Firestore/CHANGELOG.md @@ -1,3 +1,6 @@ +# Unreleased +- [changed] Internal change to stream implementation, to prepare for handshake. (#13792) + # 11.3.0 - [changed] Improve efficiency of memory persistence when processing a large number of writes. (#13572) diff --git a/Firestore/core/test/unit/remote/stream_test.cc b/Firestore/core/test/unit/remote/stream_test.cc index b6fc85a4e88..0dbf31fda04 100644 --- a/Firestore/core/test/unit/remote/stream_test.cc +++ b/Firestore/core/test/unit/remote/stream_test.cc @@ -108,22 +108,26 @@ class TestStream : public Stream { observed_states_.push_back("NotifyStreamOpen"); } - util::Status NotifyFirstStreamResponse(const grpc::ByteBuffer& message) override { + util::Status NotifyFirstStreamResponse( + const grpc::ByteBuffer& message) override { std::string str = ByteBufferToString(message); if (str.empty()) { observed_states_.push_back("NotifyFirstStreamResponse"); } else { - observed_states_.push_back(StringFormat("NotifyFirstStreamResponse(%s)", str)); + observed_states_.push_back( + StringFormat("NotifyFirstStreamResponse(%s)", str)); } return ResolveStreamResponse(); } - util::Status NotifyNextStreamResponse(const grpc::ByteBuffer& message) override { + util::Status NotifyNextStreamResponse( + const grpc::ByteBuffer& message) override { std::string str = ByteBufferToString(message); if (str.empty()) { observed_states_.push_back("NotifyNextStreamResponse"); } else { - observed_states_.push_back(StringFormat("NotifyNextStreamResponse(%s)", str)); + observed_states_.push_back( + StringFormat("NotifyNextStreamResponse(%s)", str)); } return ResolveStreamResponse(); }