diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index a594f5eafee4..7efa47407050 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -252,6 +252,8 @@ class TActionActor auto* detailedCounters = UserCounters_ ? UserCounters_->GetDetailedCounters() : nullptr; const size_t errors = ErrorsCount(Response_, detailedCounters ? &detailedCounters->APIStatuses : nullptr); + FinishTs_ = TActivationContext::Now(); + const TDuration duration = GetRequestDuration(); const TDuration workingDuration = GetRequestWorkingDuration(); if (QueueLeader_ && (IsActionForQueue(Action_) || IsActionForQueueYMQ(Action_))) { @@ -287,7 +289,6 @@ class TActionActor } } - FinishTs_ = TActivationContext::Now(); if (IsRequestSlow()) { PrintSlowRequestWarning(); } diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 67fe38b66f47..ee5cca542196 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -1189,12 +1189,13 @@ void TQueueLeader::WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchR void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) { const ui64 shard = reqInfo.GetCurrentShard(); if (!reqInfo.Answer->Failed && !reqInfo.Answer->OverLimit) { - int receiveCount = 0; int messageCount = 0; ui64 bytesRead = 0; for (auto& message : reqInfo.Answer->Messages) { - receiveCount += message.ReceiveCount; + COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, message.ReceiveCount); + COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, message.ReceiveCount); + messageCount++; bytesRead += message.Data.size(); @@ -1204,8 +1205,6 @@ void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) { } if (messageCount > 0) { - COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, receiveCount); - COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, receiveCount); ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_Count, received_count_per_second, messageCount); ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_BytesRead, received_bytes_per_second, bytesRead); } diff --git a/ydb/tests/functional/sqs/common/test_queue_counters.py b/ydb/tests/functional/sqs/common/test_queue_counters.py index f4e133bc19fb..7e2fc97c1204 100644 --- a/ydb/tests/functional/sqs/common/test_queue_counters.py +++ b/ydb/tests/functional/sqs/common/test_queue_counters.py @@ -2,6 +2,8 @@ # -*- coding: utf-8 -*- from ydb.tests.library.sqs.test_base import KikimrSqsTestBase +from ydb.tests.library.sqs.requests_client import SqsSendMessageParams + class TestSqsGettingCounters(KikimrSqsTestBase): @@ -153,3 +155,32 @@ def test_purge_queue_counters(self): 'sensor': 'MessagesPurged', }) assert purged_derivative > 0 + + def test_action_duration_being_not_immediate(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + + for i in range(100): + message_payload = "foobar" + str(i) + self._sqs_api.send_message(queue_url, message_payload) + self._read_while_not_empty(queue_url, 1) + + sqs_counters = self._get_sqs_counters() + + durations = self._get_counter(sqs_counters, { + 'queue': self.queue_name, + 'sensor': 'ReceiveMessage_Duration', + }) + buckets_longer_than_5ms = durations['hist']['buckets'][1:] + assert any(map(lambda x: x > 0, buckets_longer_than_5ms)) + + def test_receive_attempts_are_counted_separately_for_messages_in_one_batch(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + self._sqs_api.send_message_batch(queue_url, [SqsSendMessageParams('data0'), SqsSendMessageParams('data1')]) + self._read_while_not_empty(queue_url, 2) + + sqs_counters = self._get_sqs_counters() + message_receive_attempts = self._get_counter(sqs_counters, { + 'queue': self.queue_name, + 'sensor': 'MessageReceiveAttempts', + }) + assert message_receive_attempts['hist']['buckets'][0] == 2