Skip to content

Commit

Permalink
LOGBROKER 8891 list dead letter source queues (ydb-platform#8330)
Browse files Browse the repository at this point in the history
  • Loading branch information
siarheivesialou authored and stanislav-shchetinin committed Aug 30, 2024
1 parent 70fcdeb commit cd02f09
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 0 deletions.
1 change: 1 addition & 0 deletions ydb/core/grpc_services/service_ymq.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ void DoYmqSetQueueAttributesRequest(std::unique_ptr<IRequestOpCtx> p, const IFac
void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
}
}
3 changes: 3 additions & 0 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ namespace NKikimr::NHttpProxy {
action = NSQS::EAction::DeleteMessageBatch;
} else if (Method == "ChangeMessageVisibilityBatch") {
action = NSQS::EAction::ChangeMessageVisibilityBatch;
} else if (Method == "ListDeadLetterSourceQueues") {
action = NSQS::EAction::ListDeadLetterSourceQueues;
}

requestHolder->SetRequestId(HttpContext.RequestId);
Expand Down Expand Up @@ -1075,6 +1077,7 @@ namespace NKikimr::NHttpProxy {
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues);
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/http_proxy/ut/datastreams_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {

appConfig.MutableSqsConfig()->SetEnableSqs(true);
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);

auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
limit->SetMinPeriodSeconds(0);
Expand Down
46 changes: 46 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1996,4 +1996,50 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

}

Y_UNIT_TEST_F(TestListDeadLetterSourceQueues, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");

auto createDlqReq = CreateSqsCreateQueueRequest();
createQueueReq["QueueName"] = "DlqName";
res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString dlqUrl = GetByPath<TString>(json, "QueueUrl");

NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = dlqUrl;
NJson::TJsonArray attributeNames = {"QueueArn"};
getQueueAttributes["AttributeNames"] = attributeNames;
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));

TString dlqArn = GetByPath<TString>(json["Attributes"], "QueueArn");

NJson::TJsonValue setQueueAttributes;
setQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonValue attributes = {};
auto redrivePolicy = TStringBuilder()
<< "{\"deadLetterTargetArn\" : \"" << dlqArn << "\", \"maxReceiveCount\" : 100}";
attributes["RedrivePolicy"] = redrivePolicy;
setQueueAttributes["Attributes"] = attributes;

res = SendHttpRequest("/Root", "AmazonSQS.SetQueueAttributes", std::move(setQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

NJson::TJsonValue listDeadLetterSourceQueues;
listDeadLetterSourceQueues["QueueUrl"] = dlqUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ListDeadLetterSourceQueues", std::move(listDeadLetterSourceQueues), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(json["QueueUrls"][0], resultQueueUrl);
}

} // Y_UNIT_TEST_SUITE(TestHttpProxy)
1 change: 1 addition & 0 deletions ydb/public/api/grpc/draft/ydb_ymq_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ service YmqService {
rpc SendMessageBatch(SendMessageBatchRequest) returns (SendMessageBatchResponse);
rpc DeleteMessageBatch(DeleteMessageBatchRequest) returns (DeleteMessageBatchResponse);
rpc ChangeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest) returns (ChangeMessageVisibilityBatchResponse);
rpc ListDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest) returns (ListDeadLetterSourceQueuesResponse);
}
1 change: 1 addition & 0 deletions ydb/services/ymq/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ void TGRpcYmqService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger)
ADD_REQUEST(SendMessageBatch, DoYmqSendMessageBatchRequest, nullptr, Off)
ADD_REQUEST(DeleteMessageBatch, DoYmqDeleteMessageBatchRequest, nullptr, Off)
ADD_REQUEST(ChangeMessageVisibilityBatch, DoYmqChangeMessageVisibilityBatchRequest, nullptr, Off)
ADD_REQUEST(ListDeadLetterSourceQueues, DoYmqListDeadLetterSourceQueuesRequest, nullptr, Off)

#undef ADD_REQUEST
}
Expand Down
36 changes: 36 additions & 0 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -730,9 +730,44 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableSetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
AddAttribute(requestHolder, name, value);
}
return result;
}
};

class TListDeadLetterSourceQueuesReplyCallback : public TReplyCallback<
NKikimr::NSQS::TListDeadLetterSourceQueuesResponse,
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult> {
public:
using TReplyCallback::TReplyCallback;

private:
const NKikimr::NSQS::TListDeadLetterSourceQueuesResponse& GetResponse(const NKikimrClient::TSqsResponse& resp) override {
return resp.GetListDeadLetterSourceQueues();
}

Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult GetResult(const NKikimrClient::TSqsResponse& response) override {
Ydb::Ymq::V1::ListDeadLetterSourceQueuesResult result;
for (const auto& queue : response.GetListDeadLetterSourceQueues().GetQueues()) {
result.Mutablequeue_urls()->Add()->assign(queue.GetQueueUrl());
}
return result;
}
};

class TListDeadLetterSourceQueuesActor : public TRpcRequestActor<
TEvYmqListDeadLetterSourceQueuesRequest,
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest,
TListDeadLetterSourceQueuesReplyCallback> {
public:
using TRpcRequestActor::TRpcRequestActor;

private:
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(THolder<TSqsRequest>& requestHolder) override {
auto result = requestHolder->MutableListDeadLetterSourceQueues();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
return result;
}
Expand Down Expand Up @@ -933,5 +968,6 @@ DECLARE_RPC(SetQueueAttributes);
DECLARE_RPC(SendMessageBatch);
DECLARE_RPC(DeleteMessageBatch);
DECLARE_RPC(ChangeMessageVisibilityBatch);
DECLARE_RPC(ListDeadLetterSourceQueues);

}
1 change: 1 addition & 0 deletions ydb/services/ymq/ymq_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ using TEvYmqSetQueueAttributesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::
using TEvYmqSendMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::SendMessageBatchRequest, Ydb::Ymq::V1::SendMessageBatchResponse>;
using TEvYmqDeleteMessageBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::DeleteMessageBatchRequest, Ydb::Ymq::V1::DeleteMessageBatchResponse>;
using TEvYmqChangeMessageVisibilityBatchRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ChangeMessageVisibilityBatchRequest, Ydb::Ymq::V1::ChangeMessageVisibilityBatchResponse>;
using TEvYmqListDeadLetterSourceQueuesRequest = TGrpcRequestOperationCall<Ydb::Ymq::V1::ListDeadLetterSourceQueuesRequest, Ydb::Ymq::V1::ListDeadLetterSourceQueuesResponse>;

}
}

0 comments on commit cd02f09

Please sign in to comment.