Skip to content

Commit

Permalink
Add few auto partitioning fields to describe and SDK. Add important c…
Browse files Browse the repository at this point in the history
…onsumer to CLI (ydb-platform#6118)
  • Loading branch information
niksaveliev authored and nshestakov committed Aug 1, 2024
1 parent 0698d8b commit 2957170
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 7 deletions.
36 changes: 36 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,42 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
auto describe = client.DescribeTopic(TEST_TOPIC).GetValueSync();
UNIT_ASSERT_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 3);

bool firstPartitionFound = false;
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 0) {
firstPartitionFound = true;
UNIT_ASSERT(!partition.GetActive());
UNIT_ASSERT_EQUAL(partition.GetChildPartitionIds().size(), 2);
auto childIds = partition.GetChildPartitionIds();
std::sort(childIds.begin(), childIds.end());
UNIT_ASSERT_EQUAL(childIds[0], 1);
UNIT_ASSERT_EQUAL(childIds[1], 2);
}
}

UNIT_ASSERT(firstPartitionFound);

TString secondPartitionTo = "";
TString thirdPartitionFrom = "";
for (const auto& partition : describe.GetTopicDescription().GetPartitions()) {
if (partition.GetPartitionId() == 1 || partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetActive());
if (partition.GetPartitionId() == 1) {
UNIT_ASSERT(partition.GetToBound().Defined() && !partition.GetToBound()->Empty());
secondPartitionTo = *partition.GetToBound();
}
if (partition.GetPartitionId() == 2) {
UNIT_ASSERT(partition.GetFromBound().Defined() && !partition.GetFromBound()->Empty());
thirdPartitionFrom = *partition.GetFromBound();
}
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds().size(), 1);
UNIT_ASSERT_EQUAL(partition.GetParentPartitionIds()[0], 0);
}
}

UNIT_ASSERT(!secondPartitionTo.Empty());
UNIT_ASSERT(!thirdPartitionFrom.Empty());

auto writeSession2 = CreateWriteSession(client, "producer-1", 1, TEST_TOPIC, false);
UNIT_ASSERT(writeSession2->Write(Msg(msg, 3)));
UNIT_ASSERT(writeSession2->Write(Msg(msg, 4)));
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class TAlterPQ: public TSubOperation {
return nullptr;
}

if (!alterConfig.HasPartitionStrategy() && tabletConfig->HasPartitionStrategy()) {
alterConfig.MutablePartitionStrategy()->CopyFrom(tabletConfig->GetPartitionStrategy());
}

if (alterConfig.GetPartitionConfig().HasLifetimeSeconds()) {
const auto lifetimeSeconds = alterConfig.GetPartitionConfig().GetLifetimeSeconds();
if (lifetimeSeconds <= 0 || (ui32)lifetimeSeconds > TSchemeShard::MaxPQLifetimeSeconds) {
Expand Down
9 changes: 9 additions & 0 deletions ydb/public/api/protos/ydb_topic.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,13 @@ message DescribeTopicResponse {
Ydb.Operations.Operation operation = 1;
}

message PartitionKeyRange {
// Inclusive left border. Emptiness means -inf.
optional bytes from_bound = 1;
// Exclusive right border. Emptiness means +inf.
optional bytes to_bound = 2;
}

// Describe topic result message that will be inside DescribeTopicResponse.operation.
message DescribeTopicResult {
// Description of scheme object.
Expand Down Expand Up @@ -1089,6 +1096,8 @@ message DescribeTopicResult {

// Partition location, filled only when include_location in request is true.
PartitionLocation partition_location = 6;

PartitionKeyRange key_range = 7;
}

message TopicStats {
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ namespace {
config.Opts->AddLongOption("starting-message-timestamp", "Unix timestamp starting from '1970-01-01 00:00:00' from which read is allowed")
.Optional()
.StoreResult(&StartingMessageTimestamp_);
config.Opts->AddLongOption("important", "Is consumer important")
.Optional()
.DefaultValue(false)
.StoreResult(&IsImportant_);
config.Opts->SetFreeArgsNum(1);
SetFreeArgTitle(0, "<topic-path>", "Topic path");
AddAllowedCodecs(config, AllowedCodecs);
Expand Down Expand Up @@ -539,6 +543,7 @@ namespace {
codecs.push_back(NTopic::ECodec::RAW);
}
consumerSettings.SetSupportedCodecs(codecs);
consumerSettings.SetImportant(IsImportant_);

readRuleSettings.AppendAddConsumers(consumerSettings);

Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ namespace NYdb::NConsoleClient {

private:
TString ConsumerName_;
bool IsImportant_;
TMaybe<ui64> StartingMessageTimestamp_;
};

Expand Down
25 changes: 25 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,22 @@ TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeTopicResult::PartitionI
for (const auto& partId : partitionInfo.parent_partition_ids()) {
ParentPartitionIds_.push_back(partId);
}

if (partitionInfo.has_partition_stats()) {
PartitionStats_ = TPartitionStats{partitionInfo.partition_stats()};
}

if (partitionInfo.has_partition_location()) {
PartitionLocation_ = TPartitionLocation{partitionInfo.partition_location()};
}

if (partitionInfo.has_key_range() && partitionInfo.key_range().has_from_bound()) {
FromBound_ = TString(partitionInfo.key_range().from_bound());
}

if (partitionInfo.has_key_range() && partitionInfo.key_range().has_to_bound()) {
ToBound_ = TString(partitionInfo.key_range().to_bound());
}
}

TPartitionInfo::TPartitionInfo(const Ydb::Topic::DescribeConsumerResult::PartitionInfo& partitionInfo)
Expand Down Expand Up @@ -437,6 +446,14 @@ const TMaybe<TPartitionLocation>& TPartitionInfo::GetPartitionLocation() const {
return PartitionLocation_;
}

const TVector<ui64> TPartitionInfo::GetChildPartitionIds() const {
return ChildPartitionIds_;
}

const TVector<ui64> TPartitionInfo::GetParentPartitionIds() const {
return ParentPartitionIds_;
}

bool TPartitionInfo::GetActive() const {
return Active_;
}
Expand All @@ -445,6 +462,14 @@ ui64 TPartitionInfo::GetPartitionId() const {
return PartitionId_;
}

const TMaybe<TString>& TPartitionInfo::GetFromBound() const {
return FromBound_;
}

const TMaybe<TString>& TPartitionInfo::GetToBound() const {
return ToBound_;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TTopicClient

Expand Down
16 changes: 14 additions & 2 deletions ydb/public/sdk/cpp/client/ydb_topic/include/control_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,21 @@ class TPartitionInfo {
const TMaybe<TPartitionConsumerStats>& GetPartitionConsumerStats() const;
const TMaybe<TPartitionLocation>& GetPartitionLocation() const;

const TMaybe<TString>& GetFromBound() const;
const TMaybe<TString>& GetToBound() const;

private:
ui64 PartitionId_;
bool Active_;
TVector<ui64> ChildPartitionIds_;
TVector<ui64> ParentPartitionIds_;

TMaybe<TPartitionStats> PartitionStats_;
TMaybe<TPartitionConsumerStats> PartitionConsumerStats_;
TMaybe<TPartitionLocation> PartitionLocation_;

TMaybe<TString> FromBound_;
TMaybe<TString> ToBound_;
};

struct TAlterPartitioningSettings;
Expand Down Expand Up @@ -207,11 +214,11 @@ class TPartitioningSettings {
public:
TPartitioningSettings() : MinActivePartitions_(0), MaxActivePartitions_(0), PartitionCountLimit_(0), AutoPartitioningSettings_(){}
TPartitioningSettings(const Ydb::Topic::PartitioningSettings& settings);
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoscalingSettings = {})
TPartitioningSettings(ui64 minActivePartitions, ui64 maxActivePartitions, TAutoPartitioningSettings autoPartitioning = {})
: MinActivePartitions_(minActivePartitions)
, MaxActivePartitions_(maxActivePartitions)
, PartitionCountLimit_(0)
, AutoPartitioningSettings_(autoscalingSettings)
, AutoPartitioningSettings_(autoPartitioning)
{
}

Expand Down Expand Up @@ -460,6 +467,11 @@ struct TConsumerSettings {
return *this;
}

TConsumerSettings& SetImportant(bool isImportant) {
Important_ = isImportant;
return *this;
}

TSettings& EndAddConsumer() { return Parent_; };

private:
Expand Down
25 changes: 20 additions & 5 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1070,10 +1070,26 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv

if (response.PQGroupInfo) {
const auto& pqDescr = response.PQGroupInfo->Description;
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
auto part = Result.add_partitions();
part->set_partition_id(i);
part->set_active(true);
for (auto& sourcePart: pqDescr.GetPartitions()) {
auto destPart = Result.add_partitions();
destPart->set_partition_id(sourcePart.GetPartitionId());
destPart->set_active(sourcePart.GetStatus() == ::NKikimrPQ::ETopicPartitionStatus::Active);
if (sourcePart.HasKeyRange()) {
if (sourcePart.GetKeyRange().HasFromBound()) {
destPart->mutable_key_range()->set_from_bound(sourcePart.GetKeyRange().GetFromBound());
}
if (sourcePart.GetKeyRange().HasToBound()) {
destPart->mutable_key_range()->set_to_bound(sourcePart.GetKeyRange().GetToBound());
}
}

for (size_t i = 0; i < sourcePart.ChildPartitionIdsSize(); ++i) {
destPart->add_child_partition_ids(static_cast<int64_t>(sourcePart.GetChildPartitionIds(i)));
}

for (size_t i = 0; i < sourcePart.ParentPartitionIdsSize(); ++i) {
destPart->add_parent_partition_ids(static_cast<int64_t>(sourcePart.GetParentPartitionIds(i)));
}
}

const auto &config = pqDescr.GetPQTabletConfig();
Expand Down Expand Up @@ -1404,7 +1420,6 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TE
for (auto partData : record.GetPartResult()) {
if ((ui32)partData.GetPartition() != Settings.Partitions[0])
continue;

Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
partResult->set_partition_id(partData.GetPartition());
partResult->set_active(true);
Expand Down

0 comments on commit 2957170

Please sign in to comment.