Skip to content

Commit

Permalink
Fix describe and topics alter (#9713)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Sep 25, 2024
1 parent f433625 commit c1fc810
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 14 deletions.
37 changes: 37 additions & 0 deletions ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,43 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
}
}

Y_UNIT_TEST(ControlPlane_BackCompatibility) {
auto topicName = "back-compatibility-test";

TTopicSdkTestSetup setup = CreateSetup();
TTopicClient client = setup.MakeClient();

{
TCreateTopicSettings createSettings;
createSettings
.BeginConfigurePartitioningSettings()
.MinActivePartitions(3)
.EndConfigurePartitioningSettings();
client.CreateTopic(topicName, createSettings).Wait();
}

{
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 3);
}

{
TAlterTopicSettings alterSettings;
alterSettings
.BeginAlterPartitioningSettings()
.MinActivePartitions(5)
.EndAlterTopicPartitioningSettings();
client.AlterTopic(topicName, alterSettings).Wait();
}

{
auto describeAfterAlter = client.DescribeTopic(topicName).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL(describeAfterAlter.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 5);
}
}

Y_UNIT_TEST(ControlPlane_PauseAutoPartitioning) {
auto topicName = "autoscalit-topic";

Expand Down
21 changes: 13 additions & 8 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,7 @@ namespace NKikimr::NDataStreams::V1 {
Y_UNUSED(selfInfo);

TString error;
if (!GetProtoRequest()->has_partitioning_settings()) {
if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
}

groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());
}
switch (GetProtoRequest()->retention_case()) {
case Ydb::DataStreams::V1::UpdateStreamRequest::RetentionCase::kRetentionPeriodHours:
groupConfig.MutablePQTabletConfig()->MutablePartitionConfig()->SetLifetimeSeconds(
Expand Down Expand Up @@ -520,7 +513,19 @@ namespace NKikimr::NDataStreams::V1 {
}
}

if (GetProtoRequest()->has_partitioning_settings()) {
if (!GetProtoRequest()->has_partitioning_settings() ||
(GetProtoRequest()->partitioning_settings().has_auto_partitioning_settings() &&
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED) ||
(GetProtoRequest()->partitioning_settings().auto_partitioning_settings().strategy() == Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_UNSPECIFIED))) {

if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error))
{
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::BAD_REQUEST), error);
}

groupConfig.SetTotalGroupCount(GetProtoRequest()->target_shard_count());

} else {
auto r = ValidatePartitioningSettings(GetProtoRequest()->partitioning_settings());
if (!r.empty()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), r);
Expand Down
122 changes: 121 additions & 1 deletion ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2706,7 +2706,6 @@ Y_UNIT_TEST_SUITE(DataStreams) {

TString streamName = "test-topic";
TString streamName2 = "test-topic-2";

{
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
auto settings = NYdb::NTopic::TCreateTopicSettings()
Expand Down Expand Up @@ -2807,6 +2806,127 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(description.shards(4).parent_shard_id(), "shard-000001");
}

auto streamForAlterTest = "stream-alter-test";
{
auto result = testServer.DataStreamsClient->CreateStream(streamForAlterTest,
NYDS_V1::TCreateStreamSettings()
.ShardCount(3)
).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& d = result.GetResult().stream_description();
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 3);
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);

UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
}

{
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
NYDS_V1::TUpdateStreamSettings()
.TargetShardCount(5)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoPartitioningSettings()
.Strategy(NYdb::NDataStreams::V1::EAutoPartitioningStrategy::Disabled)
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings()
).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& d = result.GetResult().stream_description();
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 5);
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);

UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
}

{
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
NYDS_V1::TUpdateStreamSettings()
.TargetShardCount(10)
).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& d = result.GetResult().stream_description();
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 10);
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);

UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
}

{
auto result = testServer.DataStreamsClient->UpdateStream(streamForAlterTest,
NYDS_V1::TUpdateStreamSettings()
.TargetShardCount(15)
.BeginConfigurePartitioningSettings()
.BeginConfigureAutoPartitioningSettings()
.EndConfigureAutoPartitioningSettings()
.EndConfigurePartitioningSettings()
).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
if (result.GetStatus() != EStatus::SUCCESS) {
result.GetIssues().PrintTo(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
auto result = testServer.DataStreamsClient->DescribeStream(streamForAlterTest).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
Cerr << result.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

auto& d = result.GetResult().stream_description();
UNIT_ASSERT_VALUES_EQUAL(d.shards().size(), 15);
UNIT_ASSERT_VALUES_EQUAL(d.stream_status(), YDS_V1::StreamDescription::ACTIVE);
UNIT_ASSERT_VALUES_EQUAL(d.stream_name(), streamForAlterTest);
UNIT_ASSERT_VALUES_EQUAL(d.stream_arn(), streamForAlterTest);

UNIT_ASSERT_VALUES_EQUAL(d.partitioning_settings().auto_partitioning_settings().strategy(), ::Ydb::DataStreams::V1::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_DISABLED);
}

{
auto result = testServer.DataStreamsClient->CreateStream(streamName2,
NYDS_V1::TCreateStreamSettings()
Expand Down
30 changes: 26 additions & 4 deletions ydb/services/lib/actors/pq_schema_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,29 @@ namespace NKikimr::NGRpcProxy::V1 {
auto pqTabletConfig = pqDescr.MutablePQTabletConfig();
NPQ::Migrate(*pqTabletConfig);
auto partConfig = pqTabletConfig->MutablePartitionConfig();
auto splitMergeFeatureEnabled = appData->FeatureFlags.GetEnableTopicSplitMerge();

auto needHandleAutoPartitioning = false;
if (appData->FeatureFlags.GetEnableTopicSplitMerge()) {

auto reqHasAutoPartitioningStrategyChange = request.has_alter_partitioning_settings() &&
request.alter_partitioning_settings().has_alter_auto_partitioning_settings() &&
request.alter_partitioning_settings().alter_auto_partitioning_settings().has_set_strategy();

auto pqConfigHasAutoPartitioningStrategy = pqTabletConfig->HasPartitionStrategy() &&
pqTabletConfig->GetPartitionStrategy().HasPartitionStrategyType() &&
pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType();

if (pqConfigHasAutoPartitioningStrategy && pqTabletConfig->GetPartitionStrategy().GetPartitionStrategyType() != ::NKikimrPQ::TPQTabletConfig_TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED) {
needHandleAutoPartitioning = true;
} else if (reqHasAutoPartitioningStrategyChange) {
auto strategy = request.alter_partitioning_settings().alter_auto_partitioning_settings().set_strategy();
needHandleAutoPartitioning = strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_PAUSED ||
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP ||
strategy == ::Ydb::Topic::AutoPartitioningStrategy::AUTO_PARTITIONING_STRATEGY_SCALE_UP_AND_DOWN;
}

}


if (request.has_set_retention_storage_mb()) {
CHECK_CDC;
Expand All @@ -1178,12 +1200,12 @@ namespace NKikimr::NGRpcProxy::V1 {
if (settings.has_set_min_active_partitions()) {
auto minParts = IfEqualThenDefault<i64>(settings.set_min_active_partitions(), 0L, 1L);
pqDescr.SetTotalGroupCount(minParts);
if (splitMergeFeatureEnabled) {
if (needHandleAutoPartitioning) {
pqTabletConfig->MutablePartitionStrategy()->SetMinPartitionCount(minParts);
}
}

if (splitMergeFeatureEnabled) {
if (needHandleAutoPartitioning) {
if (settings.has_set_max_active_partitions()) {
pqTabletConfig->MutablePartitionStrategy()->SetMaxPartitionCount(settings.set_max_active_partitions());
}
Expand Down Expand Up @@ -1219,7 +1241,7 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}

if (splitMergeFeatureEnabled) {
if (needHandleAutoPartitioning) {
auto code = ValidatePartitionStrategy(*pqTabletConfig, error);
if (code) return code->YdbCode;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
}

const auto &config = pqDescr.GetPQTabletConfig();
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge()) {
if (AppData(TActivationContext::ActorContextFor(SelfId()))->FeatureFlags.GetEnableTopicSplitMerge() && NPQ::SplitMergeEnabled(config)) {
Result.mutable_partitioning_settings()->set_min_active_partitions(config.GetPartitionStrategy().GetMinPartitionCount());
} else {
Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());
Expand Down

0 comments on commit c1fc810

Please sign in to comment.