From b209651b5bd71b30f20952696cc0c775ad744c60 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Thu, 22 Aug 2024 08:47:16 +0500 Subject: [PATCH] Fix possible write session frozing (#8075) --- .../deprecated/persqueue_v0/grpc_pq_session.h | 6 ++-- .../deprecated/persqueue_v0/grpc_pq_write.cpp | 34 +++++++++++++++---- .../deprecated/persqueue_v0/grpc_pq_write.h | 3 +- .../persqueue_v0/grpc_pq_write_actor.cpp | 3 +- 4 files changed, 34 insertions(+), 12 deletions(-) diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_session.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_session.h index 922903d73ef1..3d96e52b11e1 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_session.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_session.h @@ -306,10 +306,10 @@ class ISession : public ISessionHandler protected: grpc::ServerCompletionQueue* const CQ; grpc::ServerContext Context; - grpc::ServerAsyncReaderWriter - Stream; -private: + grpc::ServerAsyncReaderWriter Stream; + TSpinLock Lock; +private: bool HaveWriteInflight; bool NeedFinish; std::atomic ClientIsDone; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp index 36ba3fa8f6fd..20bc5fe41ff8 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp @@ -25,20 +25,23 @@ void TPQWriteServiceImpl::TSession::OnCreated() { // Start waiting fo ReplyWithError("proxy overloaded", NPersQueue::NErrorCode::OVERLOAD); return; } - TMaybe localCluster = Proxy->AvailableLocalCluster(); if (NeedDiscoverClusters) { + TMaybe localCluster = Proxy->AvailableLocalCluster(); if (!localCluster.Defined()) { ReplyWithError("initializing", NPersQueue::NErrorCode::INITIALIZING); return; } else if (localCluster->empty()) { ReplyWithError("cluster disabled", NPersQueue::NErrorCode::CLUSTER_DISABLED); return; - } else { - CreateActor(*localCluster); + } else if (!CreateActor(*localCluster)) { + Proxy->ReleaseSession(this); + return; } - } else { - CreateActor(TString()); + } else if (!CreateActor(TString())) { + Proxy->ReleaseSession(this); + return; } + ReadyForNextRead(); } @@ -61,9 +64,14 @@ void TPQWriteServiceImpl::TSession::OnRead(const TWriteRequest& request) { } void TPQWriteServiceImpl::TSession::OnDone() { + { + TGuard lock(Lock); + IsDone = true; + } SendEvent(new TEvPQProxy::TEvDone()); } + TPQWriteServiceImpl::TSession::TSession(std::shared_ptr proxy, grpc::ServerCompletionQueue* cq, ui64 cookie, const TActorId& schemeCache, TIntrusivePtr counters, bool needDiscoverClusters) @@ -97,7 +105,12 @@ bool TPQWriteServiceImpl::TSession::IsShuttingDown() const { return Proxy->IsShuttingDown(); } -void TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) { +bool TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) { + TGuard lock(Lock); + if (IsDone) { + ReplyWithError("is done", NPersQueue::NErrorCode::INITIALIZING); + return false; + } auto classifier = Proxy->GetClassifier(); ActorId = Proxy->ActorSystem->Register( @@ -105,10 +118,17 @@ void TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) { classifier ? classifier->ClassifyAddress(GetPeerName()) : "unknown"), TMailboxType::Simple, 0 ); + return true; } void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) { - Proxy->ActorSystem->Send(ActorId, ev); + std::unique_ptr e; + e.reset(ev); + + TGuard lock(Lock); + if (ActorId) { + Proxy->ActorSystem->Send(ActorId, e.release()); + } } /////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.h b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.h index a3cfe7413767..aa6a9b109d3b 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.h +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.h @@ -38,7 +38,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl bool IsShuttingDown() const override; private: - void CreateActor(const TString& localCluster); + [[nodiscard]] bool CreateActor(const TString& localCluster); void SendEvent(NActors::IEventBase* ev); private: @@ -52,6 +52,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl TIntrusivePtr Counters; bool NeedDiscoverClusters; + bool IsDone = false; }; using TSessionRef = TIntrusivePtr; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp index 43a864214ba8..79a288688d1a 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp @@ -160,6 +160,7 @@ void TWriteSessionActor::CheckFinish(const TActorContext& ctx) { } void TWriteSessionActor::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " got TEvDone"); WritesDone = true; CheckFinish(ctx); } @@ -339,7 +340,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo errorReason = Sprintf("topic '%s' describe error, Status# %s, Marker# PQ1", path.back().c_str(), ToString(entry.Status).c_str()); CloseSession(errorReason, NPersQueue::NErrorCode::ERROR, ctx); - break; + return; } } if (!entry.PQGroupInfo) {