Skip to content

Commit

Permalink
fix memory leak in ymq proxy (#8706)
Browse files Browse the repository at this point in the history
  • Loading branch information
siarheivesialou authored Sep 4, 2024
1 parent 53df48f commit 6191791
Showing 1 changed file with 40 additions and 40 deletions.
80 changes: 40 additions & 40 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ namespace NKikimr::NYmq::V1 {
<< ", UserSid: " << UserSid
<< ", RequestId: " << RequestId;
);
auto requestHolder = MakeHolder<TSqsRequest>();
TSqsRequest sqsRequest;

requestHolder->SetRequestId(RequestId);
sqsRequest.SetRequestId(RequestId);

auto request = GetRequest(requestHolder);
auto request = GetRequest(sqsRequest);

request->MutableAuth()->SetUserName(CloudId);
request->MutableAuth()->SetFolderId(FolderId);
Expand All @@ -148,14 +148,14 @@ namespace NKikimr::NYmq::V1 {
request->MutableCredentials()->SetOAuthToken(SecurityToken);
}

auto actor = CreateProxyActionActor(*requestHolder.Release(), CreateReplyCallback(), true);
auto actor = CreateProxyActionActor(sqsRequest, CreateReplyCallback(), true);
ctx.RegisterWithSameMailbox(actor);

TBase::Die(ctx);
}

protected:
virtual TRequest* GetRequest(THolder<TSqsRequest>&) = 0;
virtual TRequest* GetRequest(TSqsRequest&) = 0;
virtual THolder<TReplyCallback> CreateReplyCallback() = 0;
private:
const TString FolderId;
Expand Down Expand Up @@ -200,8 +200,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TGetQueueUrlRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableGetQueueUrl();
NKikimr::NSQS::TGetQueueUrlRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableGetQueueUrl();
result->SetQueueName(GetProtoRequest()->queue_name());
return result;
}
Expand Down Expand Up @@ -233,8 +233,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TCreateQueueRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableCreateQueue();
NKikimr::NSQS::TCreateQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableCreateQueue();
result->SetQueueName(GetProtoRequest()->queue_name());

for (auto &srcAttribute : GetProtoRequest()->attributes()) {
Expand Down Expand Up @@ -275,8 +275,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TSendMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSendMessage();
NKikimr::NSQS::TSendMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableSendMessage();

for (auto& srcAttribute: GetProtoRequest()->Getmessage_attributes()) {
auto dstAttribute = result->MutableMessageAttributes()->Add();
Expand Down Expand Up @@ -376,8 +376,8 @@ namespace NKikimr::NYmq::V1 {
using TBaseRpcRequestActor::TBaseRpcRequestActor;

private:
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableReceiveMessage();
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableReceiveMessage();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);

Expand Down Expand Up @@ -531,8 +531,8 @@ namespace NKikimr::NYmq::V1 {
using TBaseRpcRequestActor::TBaseRpcRequestActor;

private:
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableGetQueueAttributes();
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableGetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) {
result->MutableNames()->Add()->assign(attributeName);
Expand Down Expand Up @@ -577,8 +577,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TListQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListQueues();
NKikimr::NSQS::TListQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableListQueues();
COPY_FIELD_IF_PRESENT(queue_name_prefix, QueueNamePrefix);
return result;
}
Expand Down Expand Up @@ -609,8 +609,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableDeleteMessage();
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteMessage();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetReceiptHandle(GetProtoRequest()->receipt_handle());
return result;
Expand Down Expand Up @@ -642,8 +642,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutablePurgeQueue();
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutablePurgeQueue();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
return result;
}
Expand Down Expand Up @@ -674,8 +674,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableDeleteQueue();
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteQueue();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
return result;
}
Expand Down Expand Up @@ -706,8 +706,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableChangeMessageVisibility();
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableChangeMessageVisibility();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle());
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
Expand All @@ -734,8 +734,8 @@ namespace NKikimr::NYmq::V1 {
};


void AddAttribute(THolder<TSqsRequest>& requestHolder, const TString& name, TString value) {
auto attribute = requestHolder->MutableSetQueueAttributes()->MutableAttributes()->Add();
void AddAttribute(TSqsRequest& requestHolder, const TString& name, TString value) {
auto attribute = requestHolder.MutableSetQueueAttributes()->MutableAttributes()->Add();
attribute->SetName(name);
attribute->SetValue(value);
};
Expand All @@ -748,8 +748,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSetQueueAttributes();
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableSetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
AddAttribute(requestHolder, name, value);
Expand Down Expand Up @@ -786,8 +786,8 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListDeadLetterSourceQueues();
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableListDeadLetterSourceQueues();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
return result;
}
Expand Down Expand Up @@ -837,13 +837,13 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSendMessageBatch();
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableSendMessageBatch();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);

for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableSendMessageBatch()->MutableEntries()->Add();
auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add();

entry->SetId(requestEntry.Getid());
entry->SetMessageBody(requestEntry.Getmessage_body());
Expand Down Expand Up @@ -905,11 +905,11 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableDeleteMessageBatch();
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteMessageBatch();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableDeleteMessageBatch()->AddEntries();
auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries();
entry->SetId(requestEntry.Getid());
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
}
Expand Down Expand Up @@ -956,11 +956,11 @@ namespace NKikimr::NYmq::V1 {
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableChangeMessageVisibilityBatch();
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableChangeMessageVisibilityBatch();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder->MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
entry->SetId(requestEntry.Getid());
entry->SetReceiptHandle(requestEntry.Getreceipt_handle());
COPY_FIELD_IF_PRESENT_IN_ENTRY(visibility_timeout, VisibilityTimeout)
Expand Down

0 comments on commit 6191791

Please sign in to comment.