Skip to content

Commit

Permalink
extract code which checks if optional is present and sets field in ma…
Browse files Browse the repository at this point in the history
…cros
  • Loading branch information
siarheivesialou committed Aug 30, 2024
1 parent ea67416 commit 7d8a5b2
Showing 1 changed file with 31 additions and 49 deletions.
80 changes: 31 additions & 49 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ using grpc::Status;

namespace NKikimr::NYmq::V1 {

#define COPY_FIELD_IF_PRESENT(from, to) \
if (GetProtoRequest()->Has##from()) { \
result->Set##to(GetProtoRequest()->Get##from()); \
}

#define COPY_FIELD_IF_PRESENT_IN_ENTRY(from, to) \
if (requestEntry.Has##from()) { \
entry->Set##to(requestEntry.Get##from()); \
}

using namespace NGRpcService;
using namespace NGRpcProxy::V1;

Expand Down Expand Up @@ -276,17 +286,9 @@ namespace NKikimr::NYmq::V1 {
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
}

if (GetProtoRequest()->Hasdelay_seconds()) {
result->SetDelaySeconds(GetProtoRequest()->Getdelay_seconds());
}

if (GetProtoRequest()->Hasmessage_deduplication_id()) {
result->SetMessageDeduplicationId(GetProtoRequest()->Getmessage_deduplication_id());
}

if (GetProtoRequest()->Hasmessage_group_id()) {
result->SetMessageGroupId(GetProtoRequest()->message_group_id());
}
COPY_FIELD_IF_PRESENT(delay_seconds, DelaySeconds);
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);

Expand Down Expand Up @@ -376,6 +378,14 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableReceiveMessage();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);

COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
COPY_FIELD_IF_PRESENT(visibility_timeout, VisibilityTimeout);
COPY_FIELD_IF_PRESENT(wait_time_seconds, WaitTimeSeconds);

auto systemAttributeNames = GetProtoRequest()->Getmessage_system_attribute_names();
// We ignore AttributeNames if SystemAttributeNames is present,
// because AttributeNames is deprecated in favour of SystemAttributeNames
Expand All @@ -390,28 +400,10 @@ namespace NKikimr::NYmq::V1 {
}
}

if (GetProtoRequest()->Hasmax_number_of_messages()) {
result->SetMaxNumberOfMessages(GetProtoRequest()->Getmax_number_of_messages());
}

for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
}

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);

if (GetProtoRequest()->Hasreceive_request_attempt_id()) {
result->SetReceiveRequestAttemptId(GetProtoRequest()->Getreceive_request_attempt_id());
}

if (GetProtoRequest()->Hasvisibility_timeout()) {
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
}

if (GetProtoRequest()->Haswait_time_seconds()) {
result->SetWaitTimeSeconds(GetProtoRequest()->Getwait_time_seconds());
}

return result;
}

Expand Down Expand Up @@ -587,9 +579,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListQueues();
if (GetProtoRequest()->Hasqueue_name_prefix()) {
result->SetQueueNamePrefix(GetProtoRequest()->Getqueue_name_prefix());
}
COPY_FIELD_IF_PRESENT(queue_name_prefix, QueueNamePrefix);
return result;
}
};
Expand Down Expand Up @@ -854,11 +844,12 @@ namespace NKikimr::NYmq::V1 {
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();

if (requestEntry.Hasdelay_seconds()) {
entry->SetDelaySeconds(requestEntry.Getdelay_seconds());
}

entry->SetId(requestEntry.Getid());
entry->SetMessageBody(requestEntry.Getmessage_body());

COPY_FIELD_IF_PRESENT_IN_ENTRY(delay_seconds, DelaySeconds);
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_deduplication_id, MessageDeduplicationId);
COPY_FIELD_IF_PRESENT_IN_ENTRY(message_group_id, MessageGroupId);

for (auto& srcAttribute: requestEntry.Getmessage_attributes()) {
auto dstAttribute = entry->MutableMessageAttributes()->Add();
Expand All @@ -867,16 +858,6 @@ namespace NKikimr::NYmq::V1 {
dstAttribute->SetBinaryValue(srcAttribute.second.Getbinary_value());
dstAttribute->SetDataType(srcAttribute.second.Getdata_type());
}

if (requestEntry.Hasmessage_deduplication_id()) {
entry->SetMessageDeduplicationId(requestEntry.Getmessage_deduplication_id());
}

if (requestEntry.Hasmessage_group_id()) {
entry->SetMessageGroupId(requestEntry.Getmessage_group_id());
}

entry->SetMessageBody(requestEntry.Getmessage_body());
}
return result;
}
Expand Down Expand Up @@ -980,14 +961,15 @@ namespace NKikimr::NYmq::V1 {
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
entry->SetId(requestEntry.Getid());
if (requestEntry.Hasvisibility_timeout()) {
entry->SetVisibilityTimeout(requestEntry.Getvisibility_timeout());
}
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
COPY_FIELD_IF_PRESENT_IN_ENTRY(visibility_timeout, VisibilityTimeout)
}
return result;
}
};

#undef COPY_FIELD_IF_PRESENT
#undef COPY_FIELD_IF_PRESENT_IN_ENTRY
}

namespace NKikimr::NGRpcService {
Expand Down

0 comments on commit 7d8a5b2

Please sign in to comment.