Skip to content

Commit

Permalink
Fix possible write session frozing (ydb-platform#8075)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Aug 22, 2024
1 parent 44f362f commit b209651
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
6 changes: 3 additions & 3 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ class ISession : public ISessionHandler<TResponse>
protected:
grpc::ServerCompletionQueue* const CQ;
grpc::ServerContext Context;
grpc::ServerAsyncReaderWriter<TResponse, TRequest>
Stream;
private:
grpc::ServerAsyncReaderWriter<TResponse, TRequest> Stream;

TSpinLock Lock;
private:
bool HaveWriteInflight;
bool NeedFinish;
std::atomic<bool> ClientIsDone;
Expand Down
34 changes: 27 additions & 7 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@ void TPQWriteServiceImpl::TSession::OnCreated() { // Start waiting fo
ReplyWithError("proxy overloaded", NPersQueue::NErrorCode::OVERLOAD);
return;
}
TMaybe<TString> localCluster = Proxy->AvailableLocalCluster();
if (NeedDiscoverClusters) {
TMaybe<TString> localCluster = Proxy->AvailableLocalCluster();
if (!localCluster.Defined()) {
ReplyWithError("initializing", NPersQueue::NErrorCode::INITIALIZING);
return;
} else if (localCluster->empty()) {
ReplyWithError("cluster disabled", NPersQueue::NErrorCode::CLUSTER_DISABLED);
return;
} else {
CreateActor(*localCluster);
} else if (!CreateActor(*localCluster)) {
Proxy->ReleaseSession(this);
return;
}
} else {
CreateActor(TString());
} else if (!CreateActor(TString())) {
Proxy->ReleaseSession(this);
return;
}

ReadyForNextRead();
}

Expand All @@ -61,9 +64,14 @@ void TPQWriteServiceImpl::TSession::OnRead(const TWriteRequest& request) {
}

void TPQWriteServiceImpl::TSession::OnDone() {
{
TGuard<TSpinLock> lock(Lock);
IsDone = true;
}
SendEvent(new TEvPQProxy::TEvDone());
}


TPQWriteServiceImpl::TSession::TSession(std::shared_ptr<TPQWriteServiceImpl> proxy,
grpc::ServerCompletionQueue* cq, ui64 cookie, const TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, bool needDiscoverClusters)
Expand Down Expand Up @@ -97,18 +105,30 @@ bool TPQWriteServiceImpl::TSession::IsShuttingDown() const {
return Proxy->IsShuttingDown();
}

void TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) {
bool TPQWriteServiceImpl::TSession::CreateActor(const TString &localCluster) {
TGuard<TSpinLock> lock(Lock);
if (IsDone) {
ReplyWithError("is done", NPersQueue::NErrorCode::INITIALIZING);
return false;
}

auto classifier = Proxy->GetClassifier();
ActorId = Proxy->ActorSystem->Register(
new TWriteSessionActor(this, Cookie, SchemeCache, Counters, localCluster,
classifier ? classifier->ClassifyAddress(GetPeerName())
: "unknown"), TMailboxType::Simple, 0
);
return true;
}

void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
Proxy->ActorSystem->Send(ActorId, ev);
std::unique_ptr<IEventBase> e;
e.reset(ev);

TGuard<TSpinLock> lock(Lock);
if (ActorId) {
Proxy->ActorSystem->Send(ActorId, e.release());
}
}

///////////////////////////////////////////////////////////////////////////////
Expand Down
3 changes: 2 additions & 1 deletion ydb/services/deprecated/persqueue_v0/grpc_pq_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
bool IsShuttingDown() const override;

private:
void CreateActor(const TString& localCluster);
[[nodiscard]] bool CreateActor(const TString& localCluster);
void SendEvent(NActors::IEventBase* ev);

private:
Expand All @@ -52,6 +52,7 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;

bool NeedDiscoverClusters;
bool IsDone = false;
};
using TSessionRef = TIntrusivePtr<TSession>;

Expand Down
3 changes: 2 additions & 1 deletion ydb/services/deprecated/persqueue_v0/grpc_pq_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ void TWriteSessionActor::CheckFinish(const TActorContext& ctx) {
}

void TWriteSessionActor::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session cookie: " << Cookie << " sessionId: " << OwnerCookie << " got TEvDone");
WritesDone = true;
CheckFinish(ctx);
}
Expand Down Expand Up @@ -339,7 +340,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
errorReason = Sprintf("topic '%s' describe error, Status# %s, Marker# PQ1", path.back().c_str(),
ToString(entry.Status).c_str());
CloseSession(errorReason, NPersQueue::NErrorCode::ERROR, ctx);
break;
return;
}
}
if (!entry.PQGroupInfo) {
Expand Down

0 comments on commit b209651

Please sign in to comment.