Skip to content

Commit

Permalink
YQ-2884 fixed fq cancel operation race (ydb-platform#2055)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Feb 20, 2024
1 parent bcc1040 commit 5792387
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,14 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
}

void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) {
if (CancelOperationIsRunning("StatusTrackerResponse (aborting). ")) {
return;
}

auto& response = *ev->Get();
if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM
LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
return;
}

Expand All @@ -116,19 +120,23 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}
}

void Handle(const TEvYdbCompute::TEvResultWriterResponse::TPtr& ev) {
if (CancelOperationIsRunning("ResultWriterResponse (aborting). ")) {
return;
}

auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
ResignAndPassAway(response.Issues);
return;
}
LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}

void Handle(const TEvYdbCompute::TEvResourcesCleanerResponse::TPtr& ev) {
Expand All @@ -139,20 +147,21 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
return;
}
LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release());
CreateFinalizer(IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status);
}

void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) {
// Pinger is no longer available at this place.
// The query can be restarted only after the expiration of lease in case of error
auto& response = *ev->Get();
LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << ") " << response.Status << " Issues: " << response.Issues.ToOneLineString());
LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << " ) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
FinishAndPassAway();
}

void Handle(TEvents::TEvQueryActionResult::TPtr& ev) {
LOG_I("QueryActionResult: " << FederatedQuery::QueryAction_Name(ev->Get()->Action));
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted) {
// Start cancel operation only when StatusTracker or ResultWriter is running
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted && !FinalizationStarted) {
IsAborted = true;
Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release());
}
Expand All @@ -166,7 +175,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
return;
}
LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
CreateResourcesCleaner();
}

void Run() { // recover points
Expand All @@ -185,7 +194,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
}
break;
case FederatedQuery::QueryMeta::FAILING:
Expand All @@ -194,7 +203,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
CreateFinalizer(Params.Status);
}
break;
default:
Expand All @@ -220,8 +229,28 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
PassAway();
}

void CreateResourcesCleaner() {
FinalizationStarted = true;
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
}

void CreateFinalizer(FederatedQuery::QueryMeta::ComputeStatus status) {
FinalizationStarted = true;
Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, status).release());
}

bool CancelOperationIsRunning(const TString& stage) const {
if (!IsAborted) {
return false;
}

LOG_I(stage << "Stop task execution, cancel operation now is running");
return true;
}

private:
bool IsAborted = false;
bool FinalizationStarted = false;
TActorId FetcherId;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
TRunActorParams Params;
Expand Down

0 comments on commit 5792387

Please sign in to comment.