Skip to content

Commit

Permalink
Set ending_sequence_number for inactive partitions of datashard (#9636)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Sep 24, 2024
1 parent 1002b94 commit cde1582
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 42 deletions.
90 changes: 48 additions & 42 deletions ydb/services/datastreams/datastreams_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,47 @@ namespace NKikimr::NDataStreams::V1 {

return {};
}

void SetShardProperties(::Ydb::DataStreams::V1::Shard* shard,
const ::NKikimrSchemeOp::TPersQueueGroupDescription_TPartition& partition,
const bool autoPartitioningEnabled,
const size_t allShardsCount,
const std::map<ui64, std::pair<ui64, ui64>>& offsets) {
shard->set_shard_id(GetShardName(partition.GetPartitionId()));


const auto& parents = partition.GetParentPartitionIds();
if (parents.size() > 0) {
shard->set_parent_shard_id(GetShardName(parents[0]));
}
if (parents.size() > 1) {
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
}

auto* rangeProto = shard->mutable_hash_key_range();
if (autoPartitioningEnabled) {
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound()
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound()
? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
} else {
auto range = RangeFromShardNumber(partition.GetPartitionId(), allShardsCount);
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
}

auto it = offsets.find(partition.GetPartitionId());
if (it != offsets.end()) {
auto* rangeProto = shard->mutable_sequence_number_range();
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);

if (::NKikimrPQ::ETopicPartitionStatus::Active != partition.GetStatus()) {
rangeProto->set_ending_sequence_number(TStringBuilder() << it->second.second);
}
}
}
}


Expand Down Expand Up @@ -845,32 +886,7 @@ namespace NKikimr::NDataStreams::V1 {
break;
} else {
auto* shard = description.add_shards();
shard->set_shard_id(shardName);

const auto& parents = partition.GetParentPartitionIds();
if (parents.size() > 0) {
shard->set_parent_shard_id(GetShardName(parents[0]));
}
if (parents.size() > 1) {
shard->set_adjacent_parent_shard_id(GetShardName(parents[1]));
}

auto* rangeProto = shard->mutable_hash_key_range();
if (NPQ::SplitMergeEnabled(pqConfig)) {
NYql::NDecimal::TUint128 from = partition.HasKeyRange() && partition.GetKeyRange().HasFromBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetFromBound()) + 1: 0;
NYql::NDecimal::TUint128 to = partition.HasKeyRange() && partition.GetKeyRange().HasToBound() ? NPQ::AsInt<NYql::NDecimal::TUint128>(partition.GetKeyRange().GetToBound()): -1;
rangeProto->set_starting_hash_key(Uint128ToDecimalString(from));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(to));
} else {
auto range = RangeFromShardNumber(partitionId, PQGroup.GetPartitions().size());
rangeProto->set_starting_hash_key(Uint128ToDecimalString(range.Start));
rangeProto->set_ending_hash_key(Uint128ToDecimalString(range.End));
}
auto it = StartEndOffsetsPerPartition.find(partitionId);
if (it != StartEndOffsetsPerPartition.end()) {
auto* rangeProto = shard->mutable_sequence_number_range();
rangeProto->set_starting_sequence_number(TStringBuilder() << it->second.first);
}
SetShardProperties(shard, partition, NPQ::SplitMergeEnabled(pqConfig), PQGroup.GetPartitions().size(), StartEndOffsetsPerPartition);
}
}
}
Expand Down Expand Up @@ -1754,6 +1770,7 @@ namespace NKikimr::NDataStreams::V1 {
std::vector<NKikimrSchemeOp::TPersQueueGroupDescription::TPartition> Shards;
ui32 LeftToRead = 0;
ui32 AllShardsCount = 0;
bool AutoPartitioningEnabled = false;
std::atomic<ui32> GotOffsetResponds;
std::vector<TActorId> Pipes;
};
Expand Down Expand Up @@ -1847,7 +1864,8 @@ namespace NKikimr::NDataStreams::V1 {
}

using TPartition = NKikimrSchemeOp::TPersQueueGroupDescription::TPartition;
const auto& partitions = topicInfo.PQGroupInfo->Description.GetPartitions();
const auto& description = topicInfo.PQGroupInfo->Description;
const auto& partitions = description.GetPartitions();
TString startingShardId = this->GetProtoRequest()->Getexclusive_start_shard_id();
ui64 startingTimepoint{0};
bool onlyOpenShards{true};
Expand Down Expand Up @@ -1895,6 +1913,8 @@ namespace NKikimr::NDataStreams::V1 {
}}
};

AutoPartitioningEnabled = NPQ::SplitMergeEnabled(description.GetPQTabletConfig());

const auto alreadyRead = NextToken.GetAlreadyRead();
if (alreadyRead > (ui32)partitions.size()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
Expand Down Expand Up @@ -1970,21 +1990,7 @@ namespace NKikimr::NDataStreams::V1 {
void TListShardsActor::SendResponse(const TActorContext& ctx) {
Ydb::DataStreams::V1::ListShardsResult result;
for (auto& shard : Shards) {
auto awsShard = result.Addshards();
// TODO:
// awsShard->set_parent_shard_id("");
// awsShard->set_adjacent_parent_shard_id(prevShardName);
auto range = RangeFromShardNumber(shard.GetPartitionId(), AllShardsCount);
awsShard->mutable_hash_key_range()->set_starting_hash_key(
Uint128ToDecimalString(range.Start));
awsShard->mutable_hash_key_range()->set_ending_hash_key(
Uint128ToDecimalString(range.End));
awsShard->mutable_sequence_number_range()->set_starting_sequence_number(
std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].first));
//TODO: fill it only for closed partitions
//awsShard->mutable_sequence_number_range()->set_ending_sequence_number(
// std::to_string(StartEndOffsetsPerPartition[shard.GetPartitionId()].second));
awsShard->set_shard_id(GetShardName(shard.GetPartitionId()));
SetShardProperties(result.Addshards(), shard, AutoPartitioningEnabled, AllShardsCount, StartEndOffsetsPerPartition);
}
if (LeftToRead > 0) {
TNextToken token(StreamName, NextToken.GetAlreadyRead() + Shards.size(), MaxResults, TInstant::Now().MilliSeconds());
Expand Down
15 changes: 15 additions & 0 deletions ydb/services/datastreams/datastreams_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,18 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(description.partitioning_settings().auto_partitioning_settings().partition_write_speed().down_utilization_percent(), 13);
}

{
std::vector<NYDS_V1::TDataRecord> records;
for (ui32 i = 1; i <= 30; ++i) {
TString data = Sprintf("%04u", i);
records.push_back({data, data, ""});
}
auto result = testServer.DataStreamsClient->PutRecords(streamName, records).ExtractValueSync();
Cerr << result.GetResult().DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
ui64 txId = 107;
NPQ::NTest::SplitPartition(*kikimr->GetRuntime(), txId, 1, "a");
Expand All @@ -2776,8 +2788,11 @@ Y_UNIT_TEST_SUITE(DataStreams) {

UNIT_ASSERT_VALUES_EQUAL(description.shards().size(), 5);
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().starting_sequence_number(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).sequence_number_range().ending_sequence_number(), "");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().starting_hash_key(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(0).hash_key_range().ending_hash_key(), "113427455640312821154458202477256070484");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().starting_sequence_number(), "0");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).sequence_number_range().ending_sequence_number(), "8");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().starting_hash_key(), "113427455640312821154458202477256070485");
UNIT_ASSERT_VALUES_EQUAL(description.shards(1).hash_key_range().ending_hash_key(), "226854911280625642308916404954512140969");
UNIT_ASSERT_VALUES_EQUAL(description.shards(2).hash_key_range().starting_hash_key(), "226854911280625642308916404954512140970");
Expand Down

0 comments on commit cde1582

Please sign in to comment.