diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp index 80de3bd5b9c5..42b70000c766 100644 --- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp +++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp @@ -800,6 +800,42 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) { auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync(); UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3); + bool firstPartitionFound = false; + for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { + if (partition.GetPartitionId() == 0) { + firstPartitionFound = true; + UNIT_ASSERT(!partition.GetActive()); + UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2); + auto childIds = partition.GetChildPartitionIds(); + std::sort(childIds.begin(), childIds.end()); + UNIT_ASSERT_EQUAL(childIds[0], 1); + UNIT_ASSERT_EQUAL(childIds[1], 2); + } + } + + UNIT_ASSERT(firstPartitionFound); + + TString secondPartitionTo = ""; + TString thirdPartitionFrom = ""; + for (const auto& partition : describe.GetTopicDescription().GetPartitions()) { + if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) { + UNIT_ASSERT(partition.GetActive()); + if (partition.GetPartitionId() == 1) { + UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty()); + secondPartitionTo = *partition.GetToBound(); + } + if (partition.GetPartitionId() == 2) { + UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty()); + thirdPartitionFrom = *partition.GetFromBound(); + } + UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1); + UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0); + } + } + + UNIT_ASSERT(!secondPartitionTo.Empty()); + UNIT_ASSERT(!thirdPartitionFrom.Empty()); + auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false); UNIT_ASSERT(writeSession2->Write(Msg(msg, 3))); UNIT_ASSERT(writeSession2->Write(Msg(msg, 4))); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index dac4b58742ce..5246284f82c4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation { return nullptr; } + if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) { + alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy()); + } + if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) { const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds(); if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) { diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index ccccbd59d811..d94b6ec3a462 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -1019,6 +1019,13 @@ message DescribeTopicResponse { Ydb.Operations.Operation operation = 1; } +message PartitionKeyRange { + // Inclusive left border. Emptiness means -inf. + optional bytes from_bound = 1; + // Exclusive right border. Emptiness means +inf. + optional bytes to_bound = 2; +} + // Describe topic result message that will be inside DescribeTopicResponse.operation. message DescribeTopicResult { // Description of scheme object. @@ -1087,6 +1094,8 @@ message DescribeTopicResult { // Partition location, filled only when include_location in request is true. PartitionLocation partition_location = 6; + + PartitionKeyRange key_range = 7; } message TopicStats { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index c9caba3825ba..15ea3748aa6d 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -507,6 +507,10 @@ namespace { config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed") .Optional() .StoreResult(&StartingMessageTimestamp_); + config.Opts->AddLongOption("important", "Is consumer important") + .Optional() + .DefaultValue(false) + .StoreResult(&IsImportant_); config.Opts->SetFreeArgsNum(1); SetFreeArgTitle(0, "", "Topic path"); AddAllowedCodecs(config, AllowedCodecs); @@ -537,6 +541,7 @@ namespace { codecs.push_back(NTopic::ECodec::RAW); } consumerSettings.SetSupportedCodecs(codecs); + consumerSettings.SetImportant(IsImportant_); readRuleSettings.AppendAddConsumers(consumerSettings); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h index cebefe86f6af..029ae5d4e07a 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.h @@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient { private: TString ConsumerName_; + bool IsImportant_; TMaybe StartingMessageTimestamp_; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp index 486e6a747d39..4eec57ed050e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp @@ -395,6 +395,7 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI for (const auto& partId : partitionInfo.parent_partition_ids()) { ParentPartitionIds_.push_back(partId); } + if (partitionInfo.has_partition_stats()) { PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()}; } @@ -402,6 +403,14 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI if (partitionInfo.has_partition_location()) { PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()}; } + + if (partitionInfo.has_key_range() && partitionInfo.key_range().has_from_bound()) { + FromBound_ = TString(partitionInfo.key_range().from_bound()); + } + + if (partitionInfo.has_key_range() && partitionInfo.key_range().has_to_bound()) { + ToBound_ = TString(partitionInfo.key_range().to_bound()); + } } TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo) @@ -437,6 +446,14 @@ const TMaybe& TPartitionInfo::GetPartitionLocation() const { return PartitionLocation_; } +const TVector TPartitionInfo::GetChildPartitionIds() const { + return ChildPartitionIds_; +} + +const TVector TPartitionInfo::GetParentPartitionIds() const { + return ParentPartitionIds_; +} + bool TPartitionInfo::GetActive() const { return Active_; } @@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const { return PartitionId_; } +const TMaybe& TPartitionInfo::GetFromBound() const { + return FromBound_; +} + +const TMaybe& TPartitionInfo::GetToBound() const { + return ToBound_; +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TTopicClient diff --git a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h index e84c54dd3135..8c5cede81c18 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h @@ -144,14 +144,21 @@ class TPartitionInfo { const TMaybe& GetPartitionConsumerStats() const; const TMaybe& GetPartitionLocation() const; + const TMaybe& GetFromBound() const; + const TMaybe& GetToBound() const; + private: ui64 PartitionId_; bool Active_; TVector ChildPartitionIds_; TVector ParentPartitionIds_; + TMaybe PartitionStats_; TMaybe PartitionConsumerStats_; TMaybe PartitionLocation_; + + TMaybe FromBound_; + TMaybe ToBound_; }; struct TAlterPartitioningSettings; @@ -206,11 +213,11 @@ class TPartitioningSettings { public: TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){} TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings); - TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {}) + TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {}) : MinActivePartitions_(minActivePartitions) , MaxActivePartitions_(maxActivePartitions) , PartitionCountLimit_(0) - , AutoPartitioningSettings_(autoscalingSettings) + , AutoPartitioningSettings_(autoPartitioning) { } @@ -459,6 +466,11 @@ struct TConsumerSettings { return *this; } + TConsumerSettings& SetImportant(bool isImportant) { + Important_ = isImportant; + return *this; + } + TSettings& EndAddConsumer() { return Parent_; }; private: diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a0e4d3e1bad8..f6a8519fc561 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv if (response.PQGroupInfo) { const auto& pqDescr = response.PQGroupInfo->Description; - for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) { - auto part = Result.add_partitions(); - part->set_partition_id(i); - part->set_active(true); + for (auto& sourcePart: pqDescr.GetPartitions()) { + auto destPart = Result.add_partitions(); + destPart->set_partition_id(sourcePart.GetPartitionId()); + destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active); + if (sourcePart.HasKeyRange()) { + if (sourcePart.GetKeyRange().HasFromBound()) { + destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound()); + } + if (sourcePart.GetKeyRange().HasToBound()) { + destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound()); + } + } + + for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) { + destPart->add_child_partition_ids(static_cast(sourcePart.GetChildPartitionIds(i))); + } + + for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) { + destPart->add_parent_partition_ids(static_cast(sourcePart.GetParentPartitionIds(i))); + } } const auto &config = pqDescr.GetPQTabletConfig(); @@ -1401,7 +1417,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE for (auto partData : record.GetPartResult()) { if ((ui32)partData.GetPartition() != Settings.Partitions[0]) continue; - Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]); partResult->set_partition_id(partData.GetPartition()); partResult->set_active(true);