Skip to content

Commit

Permalink
fix broken counters in YMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
siarheivesialou committed May 2, 2024
1 parent eae74a3 commit ec27fa5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
3 changes: 2 additions & 1 deletion ydb/core/ymq/actor/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ class TActionActor
auto* detailedCounters = UserCounters_ ? UserCounters_->GetDetailedCounters() : nullptr;
const size_t errors = ErrorsCount(Response_, detailedCounters ? &detailedCounters->APIStatuses : nullptr);

FinishTs_ = TActivationContext::Now();

const TDuration duration = GetRequestDuration();
const TDuration workingDuration = GetRequestWorkingDuration();
if (QueueLeader_ && (IsActionForQueue(Action_) || IsActionForQueueYMQ(Action_))) {
Expand Down Expand Up @@ -287,7 +289,6 @@ class TActionActor
}
}

FinishTs_ = TActivationContext::Now();
if (IsRequestSlow()) {
PrintSlowRequestWarning();
}
Expand Down
7 changes: 3 additions & 4 deletions ydb/core/ymq/actor/queue_leader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1189,12 +1189,13 @@ void TQueueLeader::WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchR
void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) {
const ui64 shard = reqInfo.GetCurrentShard();
if (!reqInfo.Answer->Failed && !reqInfo.Answer->OverLimit) {
int receiveCount = 0;
int messageCount = 0;
ui64 bytesRead = 0;

for (auto& message : reqInfo.Answer->Messages) {
receiveCount += message.ReceiveCount;
COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, message.ReceiveCount);
COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, message.ReceiveCount);

messageCount++;
bytesRead += message.Data.size();

Expand All @@ -1204,8 +1205,6 @@ void TQueueLeader::Reply(TReceiveMessageBatchRequestProcessing& reqInfo) {
}

if (messageCount > 0) {
COLLECT_HISTOGRAM_COUNTER(Counters_, MessageReceiveAttempts, receiveCount);
COLLECT_HISTOGRAM_COUNTER(Counters_, receive_attempts_count_rate, receiveCount);
ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_Count, received_count_per_second, messageCount);
ADD_COUNTER_COUPLE(Counters_, ReceiveMessage_BytesRead, received_bytes_per_second, bytesRead);
}
Expand Down
31 changes: 31 additions & 0 deletions ydb/tests/functional/sqs/common/test_queue_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# -*- coding: utf-8 -*-
from ydb.tests.library.sqs.test_base import KikimrSqsTestBase

from ydb.tests.library.sqs.requests_client import SqsSendMessageParams


class TestSqsGettingCounters(KikimrSqsTestBase):

Expand Down Expand Up @@ -153,3 +155,32 @@ def test_purge_queue_counters(self):
'sensor': 'MessagesPurged',
})
assert purged_derivative > 0

def test_action_duration_being_not_immediate(self):
queue_url = self._create_queue_and_assert(self.queue_name, False, True)

for i in range(100):
message_payload = "foobar" + str(i)
self._sqs_api.send_message(queue_url, message_payload)
self._read_while_not_empty(queue_url, 1)

sqs_counters = self._get_sqs_counters()

durations = self._get_counter(sqs_counters, {
'queue': self.queue_name,
'sensor': 'ReceiveMessage_Duration',
})
buckets_longer_than_5ms = durations['hist']['buckets'][1:]
assert any(map(lambda x: x > 0, buckets_longer_than_5ms))

def test_receive_attempts_are_counted_separately_for_messages_in_one_batch(self):
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
self._sqs_api.send_message_batch(queue_url, [SqsSendMessageParams('data0'), SqsSendMessageParams('data1')])
self._read_while_not_empty(queue_url, 2)

sqs_counters = self._get_sqs_counters()
message_receive_attempts = self._get_counter(sqs_counters, {
'queue': self.queue_name,
'sensor': 'MessageReceiveAttempts',
})
assert message_receive_attempts['hist']['buckets'][0] == 2

0 comments on commit ec27fa5

Please sign in to comment.