Skip to content

Commit

Permalink
The value of the WriteInflightSize in the main partition (ydb-platf…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Aug 22, 2024
1 parent b209651 commit 214bb2c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 0 deletions.
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2229,6 +2229,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}, std::nullopt};
msg.Internal = true;

WriteInflightSize += msg.Msg.Data.size();
ExecRequest(msg, *Parameters, PersistRequest.Get());

auto& info = TxSourceIdForPostPersist[blob.SourceId];
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,10 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
auto& sourceIdBatch = parameters.SourceIdBatch;
auto sourceId = sourceIdBatch.GetSource(p.Msg.SourceId);

Y_DEBUG_ABORT_UNLESS(WriteInflightSize >= p.Msg.Data.size(),
"PQ %" PRIu64 ", Partition {%" PRIu32 ", %" PRIu32 "}, WriteInflightSize=%" PRIu64 ", p.Msg.Data.size=%" PRISZT,
TabletID, Partition.OriginalPartitionId, Partition.InternalPartitionId,
WriteInflightSize, p.Msg.Data.size());
WriteInflightSize -= p.Msg.Data.size();

TabletCounters.Percentile()[COUNTER_LATENCY_PQ_RECEIVE_QUEUE].IncrementFor(ctx.Now().MilliSeconds() - p.Msg.ReceiveTimestamp);
Expand Down Expand Up @@ -1538,6 +1542,10 @@ void TPartition::FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue&

TabletCounters.Cumulative()[COUNTER_PQ_WRITE_ERROR].Increment(1);
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_BYTES_ERROR].Increment(msg.Data.size() + msg.SourceId.size());
Y_DEBUG_ABORT_UNLESS(WriteInflightSize >= msg.Data.size(),
"PQ %" PRIu64 ", Partition {%" PRIu32 ", %" PRIu32 "}, WriteInflightSize=%" PRIu64 ", msg.Data.size=%" PRISZT,
TabletID, Partition.OriginalPartitionId, Partition.InternalPartitionId,
WriteInflightSize, msg.Data.size());
WriteInflightSize -= msg.Data.size();
}

Expand Down
23 changes: 23 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,29 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
}
}

Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
{
// The test verifies that the `WriteInflightSize` is correctly considered for the main partition.
// Writing to the service partition does not change the `WriteInflightSize` of the main one.
CreateTopic("topic_A", TEST_CONSUMER);

NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);

TString message(16'000, 'a');

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, TString(16'000, 'a'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);

CommitTx(tx, EStatus::SUCCESS);

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, TString(20'000, 'b'), nullptr, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_2);

auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0);
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}

}

}

0 comments on commit 214bb2c

Please sign in to comment.