From 732b4ccd6d081966c5fba52e151583bda845db18 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 29 May 2024 12:14:45 +0000 Subject: [PATCH 1/4] autoscaling logs --- .../persqueue/partition_scale_manager.cpp | 15 +++++++++++--- ydb/core/persqueue/partition_scale_manager.h | 2 +- .../persqueue/partition_scale_request.cpp | 12 +++++++---- ydb/core/persqueue/partition_scale_request.h | 4 ++-- ydb/core/persqueue/partition_write.cpp | 20 ++++++++++++++++--- ydb/core/persqueue/read_balancer.cpp | 2 +- ydb/core/persqueue/ut/autoscaling_ut.cpp | 1 - 7 files changed, 41 insertions(+), 15 deletions(-) diff --git a/ydb/core/persqueue/partition_scale_manager.cpp b/ydb/core/persqueue/partition_scale_manager.cpp index 7c83d53c2f0c..ef90ae9508e7 100644 --- a/ydb/core/persqueue/partition_scale_manager.cpp +++ b/ydb/core/persqueue/partition_scale_manager.cpp @@ -17,6 +17,8 @@ TPartitionScaleManager::TPartitionScaleManager( void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) { if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " + << "need to split partition " << partition); PartitionsToSplit.emplace(partition.Id, partition); TrySendScaleRequest(ctx); } else { @@ -30,12 +32,14 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) { return; } - auto splitMergePair = BuildScaleRequest(); + auto splitMergePair = BuildScaleRequest(ctx); if (splitMergePair.first.empty() && splitMergePair.second.empty()) { return; } RequestInflight = true; + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " + << "send split request"); CurrentScaleRequest = ctx.Register(new TPartitionScaleRequest( TopicName, DatabasePath, @@ -51,7 +55,7 @@ void TPartitionScaleManager::TrySendScaleRequest(const TActorContext& ctx) { using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit; using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge; -std::pair, std::vector> TPartitionScaleManager::BuildScaleRequest() { +std::pair, std::vector> TPartitionScaleManager::BuildScaleRequest(const TActorContext& ctx) { std::vector splitsToApply; std::vector mergesToApply; @@ -62,11 +66,15 @@ std::pair, std::vector> TPartition const auto& partition = itSplit->second; if (BalancerConfig.PartitionGraph.GetPartition(partitionId)->Children.empty()) { - auto mid = GetRangeMid(partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : "", partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""); + auto from = partition.KeyRange.FromBound ? *partition.KeyRange.FromBound : ""; + auto to = partition.KeyRange.ToBound ?*partition.KeyRange.ToBound : ""; + auto mid = GetRangeMid(from, to); if (mid.empty()) { itSplit = PartitionsToSplit.erase(itSplit); + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest wrong partition key range. Can't get mid. Topic# " << TopicName << ", partition# " << partitionId); continue; } + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::BuildScaleRequest partition split ranges. From# '" << from << "'. To# '" << to << "'. Mid# '" << mid <<"'. Topic# " << TopicName << ". Partition# " << partitionId); TPartitionSplit split; split.set_partition(partition.Id); @@ -87,6 +95,7 @@ void TPartitionScaleManager::HandleScaleRequestResult(TPartitionScaleRequest::TE RequestInflight = false; LastResponseTime = ctx.Now(); auto result = ev->Get(); + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleRequestResult scale request result: " << result->Status << ". Topic# " << TopicName); if (result->Status == TEvTxUserProxy::TResultStatus::ExecComplete) { TrySendScaleRequest(ctx); } else { diff --git a/ydb/core/persqueue/partition_scale_manager.h b/ydb/core/persqueue/partition_scale_manager.h index 30d75ea39a47..8d76b1fe97ae 100644 --- a/ydb/core/persqueue/partition_scale_manager.h +++ b/ydb/core/persqueue/partition_scale_manager.h @@ -66,7 +66,7 @@ class TPartitionScaleManager { using TPartitionSplit = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionSplit; using TPartitionMerge = NKikimrSchemeOp::TPersQueueGroupDescription_TPartitionMerge; - std::pair, std::vector> BuildScaleRequest(); + std::pair, std::vector> BuildScaleRequest(const TActorContext& ctx); public: static const ui64 TRY_SCALE_REQUEST_WAKE_UP_TAG = 10; diff --git a/ydb/core/persqueue/partition_scale_request.cpp b/ydb/core/persqueue/partition_scale_request.cpp index 9bb1e013bac7..a24b263e2368 100644 --- a/ydb/core/persqueue/partition_scale_request.cpp +++ b/ydb/core/persqueue/partition_scale_request.cpp @@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) { void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) { auto proposal = std::make_unique(); proposal->Record.SetDatabaseName(CanonizePath(DatabasePath)); - FillProposeRequest(*proposal, DatabasePath, Topic); + FillProposeRequest(ctx,*proposal, DatabasePath, Topic); ctx.Send(MakeTxProxyID(), proposal.release()); } -void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) { +void TPartitionScaleRequest::FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) { auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme(); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); @@ -46,11 +46,14 @@ void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransa NKikimrSchemeOp::TPersQueueGroupDescription groupDescription; groupDescription.SetName(topicName); - + TStringBuilder logMessage; + logMessage << "TPartitionScaleRequest::FillProposeRequest trying to scale partitions. Spilts: "; for(const auto& split: Splits) { auto* newSplit = groupDescription.AddSplit(); + logMessage << "partition: " << split.GetPartition() << " boundary: '" << split.GetSplitBoundary() << "' "; *newSplit = split; } + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, logMessage); for(const auto& merge: Merges) { auto* newMerge = groupDescription.AddMerge(); @@ -98,7 +101,8 @@ void TPartitionScaleRequest::Handle(TEvTxUserProxy::TEvProposeTransactionStatus: for (auto& issue : ev->Get()->Record.GetIssues()) { issues << issue.ShortDebugString() + ", "; } - Cerr << "\n SAVDGB " << issues << "\n"; + LOG_ERROR_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleRequest " + << "SchemaShard error when trying to execute a split request: " << issues); Send(ParentActorId, scaleRequestResult.release()); Die(ctx); } else { diff --git a/ydb/core/persqueue/partition_scale_request.h b/ydb/core/persqueue/partition_scale_request.h index 736b5196e35b..f1614d37d18d 100644 --- a/ydb/core/persqueue/partition_scale_request.h +++ b/ydb/core/persqueue/partition_scale_request.h @@ -12,7 +12,7 @@ namespace NKikimr { namespace NPQ { - + class TPartitionScaleRequest: public NActors::TActorBootstrapped { using TBase = NActors::TActorBootstrapped; @@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped SplitPath(const TString& path); void SendProposeRequest(const NActors::TActorContext &ctx); - void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName); + void FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName); private: const TString Topic; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 1295038e1e51..285e61c005d6 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -541,17 +541,31 @@ void TPartition::HandleWriteResponse(const TActorContext& ctx) { ProcessTimestampsForNewData(prevEndOffset, ctx); } -NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& /*ctx*/) { +NKikimrPQ::EScaleStatus TPartition::CheckScaleStatus(const TActorContext& ctx) { auto const writeSpeedUsagePercent = SplitMergeAvgWriteBytes->GetValue() * 100.0 / Config.GetPartitionStrategy().GetScaleThresholdSeconds() / TotalPartitionWriteSpeed; - + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus writeSpeedUsagePercent# " << writeSpeedUsagePercent << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); auto splitEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT || Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; - + auto mergeEnabled = Config.GetPartitionStrategy().GetPartitionStrategyType() == ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_CAN_SPLIT_AND_MERGE; if (splitEnabled && writeSpeedUsagePercent >= Config.GetPartitionStrategy().GetScaleUpPartitionWriteSpeedThresholdPercent()) { + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus NEED_SPLIT" << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); return NKikimrPQ::EScaleStatus::NEED_SPLIT; } else if (mergeEnabled && writeSpeedUsagePercent <= Config.GetPartitionStrategy().GetScaleDownPartitionWriteSpeedThresholdPercent()) { + LOG_DEBUG_S( + ctx, NKikimrServices::PERSQUEUE, + "TPartition::CheckScaleStatus NEED_MERGE" << " Topic: \"" << TopicName() << "\"." << + " Partition: " << Partition + ); return NKikimrPQ::EScaleStatus::NEED_MERGE; } return NKikimrPQ::EScaleStatus::NORMAL; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 5ad1f4c6931a..11785ef95b69 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -511,7 +511,7 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr Y_ABORT_UNLESS(p.GetPartition() >= prevNextPartitionId && p.GetPartition() < NextPartitionId || NextPartitionId == 0); partitionsInfo[p.GetPartition()] = {p.GetTabletId(), {}}; - if (SplitMergeEnabled(TabletConfig)) { + if (SplitMergeEnabled(TabletConfig) && p.HasKeyRange()) { partitionsInfo[p.GetPartition()].KeyRange.DeserializeFromProto(p.GetKeyRange()); } diff --git a/ydb/core/persqueue/ut/autoscaling_ut.cpp b/ydb/core/persqueue/ut/autoscaling_ut.cpp index 896daa44278e..2ddf2bb96e44 100644 --- a/ydb/core/persqueue/ut/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/autoscaling_ut.cpp @@ -624,7 +624,6 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { a = "a"; b = {}; res = NKikimr::NPQ::TPartitionScaleManager::GetRangeMid(a,b); - Cerr << "\n SAVDBG " << res << "\n"; UNIT_ASSERT(a < res); UNIT_ASSERT(b != res); From 0b95af16083e59babad0e888fc075ce74a8845a8 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 29 May 2024 12:39:39 +0000 Subject: [PATCH 2/4] Fix --- ydb/core/persqueue/partition_scale_request.cpp | 4 ++-- ydb/core/persqueue/partition_scale_request.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/core/persqueue/partition_scale_request.cpp b/ydb/core/persqueue/partition_scale_request.cpp index a24b263e2368..28e7d8f7a595 100644 --- a/ydb/core/persqueue/partition_scale_request.cpp +++ b/ydb/core/persqueue/partition_scale_request.cpp @@ -30,11 +30,11 @@ void TPartitionScaleRequest::Bootstrap(const NActors::TActorContext &ctx) { void TPartitionScaleRequest::SendProposeRequest(const NActors::TActorContext &ctx) { auto proposal = std::make_unique(); proposal->Record.SetDatabaseName(CanonizePath(DatabasePath)); - FillProposeRequest(ctx,*proposal, DatabasePath, Topic); + FillProposeRequest(*proposal, DatabasePath, Topic, ctx); ctx.Send(MakeTxProxyID(), proposal.release()); } -void TPartitionScaleRequest::FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName) { +void TPartitionScaleRequest::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx) { auto& modifyScheme = *proposal.Record.MutableTransaction()->MutableModifyScheme(); modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); diff --git a/ydb/core/persqueue/partition_scale_request.h b/ydb/core/persqueue/partition_scale_request.h index f1614d37d18d..017825e78d82 100644 --- a/ydb/core/persqueue/partition_scale_request.h +++ b/ydb/core/persqueue/partition_scale_request.h @@ -48,7 +48,7 @@ class TPartitionScaleRequest: public NActors::TActorBootstrapped SplitPath(const TString& path); void SendProposeRequest(const NActors::TActorContext &ctx); - void FillProposeRequest(const NActors::TActorContext &ctx, TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName); + void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TString& workingDir, const TString& topicName, const NActors::TActorContext &ctx); private: const TString Topic; From 4878a535a00c05fcb25a0e60a8a8b7235b1a1876 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Wed, 29 May 2024 12:50:13 +0000 Subject: [PATCH 3/4] Fix --- ydb/core/persqueue/partition_scale_manager.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/partition_scale_manager.cpp b/ydb/core/persqueue/partition_scale_manager.cpp index ef90ae9508e7..58964b441e73 100644 --- a/ydb/core/persqueue/partition_scale_manager.cpp +++ b/ydb/core/persqueue/partition_scale_manager.cpp @@ -18,7 +18,7 @@ TPartitionScaleManager::TPartitionScaleManager( void TPartitionScaleManager::HandleScaleStatusChange(const TPartitionInfo& partition, NKikimrPQ::EScaleStatus scaleStatus, const TActorContext& ctx) { if (scaleStatus == NKikimrPQ::EScaleStatus::NEED_SPLIT) { LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER, "TPartitionScaleManager::HandleScaleStatusChange " - << "need to split partition " << partition); + << "need to split partition " << partition.Id); PartitionsToSplit.emplace(partition.Id, partition); TrySendScaleRequest(ctx); } else { From 3edf63f4805fb18f3698f4dffa0dfec1ca3e015f Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 6 Jun 2024 09:45:48 +0000 Subject: [PATCH 4/4] Mute split by load ut --- .github/config/muted_ya.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index cff306a46a66..e9bd667b8ecf 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -32,6 +32,7 @@ ydb/core/kqp/ut/service [38/50]* ydb/core/persqueue/ut TPartitionTests.TestBatchingWithProposeConfig ydb/core/persqueue/ut TPQTest.*DirectRead* ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_ManySession_NewSDK +ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_AutosplitByLoad ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables @@ -83,8 +84,8 @@ ydb/tests/functional/kqp/kqp_indexes ConsistentIndexRead.InteractiveTx ydb/tests/functional/kqp/kqp_query_session KqpQuerySession.NoLocalAttach ydb/tests/functional/restarts test_restarts.py.* ydb/tests/functional/serverless test_serverless.py.test_database_with_disk_quotas[enable_alter_database_create_hive_first--*] -ydb/tests/functional/sqs/multinode/test_multinode_cluster.py.TestSqsMultinodeCluster.test_has_messages_counters[kick_tablets-*] -ydb/tests/functional/sqs/multinode/test_multinode_cluster.py.TestSqsMultinodeCluster.test_has_messages_counters[stop_node*] +ydb/tests/functional/sqs/multinode/test_multinode_cluster.py.TestSqsMultinodeCluster.test_has_messages_counters[kick_tablets-*] +ydb/tests/functional/sqs/multinode/test_multinode_cluster.py.TestSqsMultinodeCluster.test_has_messages_counters[stop_node*] ydb/tests/functional/tenants test_dynamic_tenants.py.* ydb/tests/functional/tenants test_storage_config.py.TestStorageConfig.* ydb/tests/functional/tenants test_tenants.py.*