Skip to content
This repository has been archived by the owner on Sep 6, 2024. It is now read-only.

Commit

Permalink
Fix tests after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Juozas Skarbalius committed Aug 23, 2024
1 parent fff4453 commit f03cb48
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,35 @@ class DefaultErrorHandlingStrategy<I> implements ErrorHandlingStrategy<I> {

@Override
@SneakyThrows
public void handleWorkerException(Exception e, Message<I> message) {
String messageId = String.valueOf(message.getHeaders().get("id", UUID.class));
log.error("error while handling message " + messageId + ": " + message.getPayload(), e);
public void handleWorkerException(Exception e, MessageWrapper<I> message) {
String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class));
log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), e);
throw e;
}

@Override
@SneakyThrows
public void handleWorkerThrowable(Throwable t, Message<I> message) {
String messageId = String.valueOf(message.getHeaders().get("id", UUID.class));
log.error("error while handling message " + messageId + ": " + message.getPayload(), t);
public void handleWorkerThrowable(Throwable t, MessageWrapper<I> message) {
String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class));
log.error("error while handling message " + messageId + ": " + message.getMessage().getPayload(), t);
throw t;
}

@Override
public void handleExtendVisibilityTimeoutException(AwsServiceException e,
Message<?> message) {
MessageWrapper<?> message) {

String msg = "error while extending message visibility for " + Objects.requireNonNull(
message.getHeaders().get("id",
message.getMessage().getHeaders().get("id",
UUID.class));
log.error(msg, e);
throw e;

}

@Override
public void handleAcknowledgeMessageException(AwsServiceException e, Message<I> message) {
String messageId = String.valueOf(message.getHeaders().get("id", UUID.class));
public void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper<I> message) {
String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class));
log.error("could not acknowledge " + messageId, e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message
* that was incorrectly processed
*/
void handleWorkerException(Exception e, Message<I> message);
void handleWorkerException(Exception e, MessageWrapper<I> message);

/**
* Defines how a throwable, that is thrown by the worker are handled. If a
Expand All @@ -40,7 +40,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message
* that was incorrectly processed
*/
void handleWorkerThrowable(Throwable t, Message<I> message);
void handleWorkerThrowable(Throwable t, MessageWrapper<I> message);

/**
* Defines how exceptions, that are thrown by the timeout extension are handled.
Expand All @@ -51,7 +51,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message that was tried to extend
*/

void handleExtendVisibilityTimeoutException(AwsServiceException e, Message<?> message);
void handleExtendVisibilityTimeoutException(AwsServiceException e, MessageWrapper<?> message);

/**
* Defines how exceptions, that are thrown by the message acknowledgement are handled.
Expand All @@ -62,7 +62,7 @@ public interface ErrorHandlingStrategy<I> {
* @param message that was tried to extend
*/

void handleAcknowledgeMessageException(AwsServiceException e, Message<I> message);
void handleAcknowledgeMessageException(AwsServiceException e, MessageWrapper<I> message);



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public class LongRunningMessageHandler<I, O> {
* @param message
* the message to be processed
*/
public void handleMessage(@NonNull Message<I> message) {
String messageId = String.valueOf(message.getHeaders().get("id", UUID.class));
public void handleMessage(@NonNull MessageWrapper<I> message) {
String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class));

if (messagesInProcessing.contains(messageId)) {
return;
Expand Down Expand Up @@ -174,15 +174,15 @@ public int getFreeWorkerCapacity() {
return messagesInProcessing.free();
}

private void scheduleNewMessageTask(@NonNull Message<I> message,
private void scheduleNewMessageTask(@NonNull MessageWrapper<I> message,
ScheduledFuture<?> visibilityTimeoutExtender) {
MessageHandlingRunnable<I, O> messageTask = messageHandlingRunnableFactory.get(worker,
message, finishedMessageCallback, messagesInProcessing, visibilityTimeoutExtender, errorHandlingStrategy);

messageProcessingExecutor.submit(messageTask);
}

private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull Message<I> message) {
private ScheduledFuture<?> scheduleNewVisibilityTimeoutExtender(@NonNull MessageWrapper<I> message) {
VisibilityTimeoutExtender timeoutExtender = timeoutExtenderFactory.get(message, queue, errorHandlingStrategy);
return timeoutExtensionExecutor.scheduleAtFixedRate(timeoutExtender,
timeUntilVisibilityTimeoutExtension.toMillis(), timeUntilVisibilityTimeoutExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class MessageHandlingRunnable<I, O> implements Runnable {

private final MessageWorkerWithHeaders<I, O> worker;

private final Message<I> message;
private final MessageWrapper<I> message;

private final FinishedMessageCallback<I, O> finishedMessageCallback;

Expand All @@ -42,32 +42,31 @@ public class MessageHandlingRunnable<I, O> implements Runnable {
private final ErrorHandlingStrategy<I> errorHandlingStrategy;

MessageHandlingRunnable(@NonNull MessageWorkerWithHeaders<I, O> worker,
@NonNull Message<I> message,
@NonNull MessageWrapper<I> message,
@NonNull FinishedMessageCallback<I, O> finishedMessageCallback,
@NonNull SetWithUpperBound<String> messages,
@NonNull ScheduledFuture<?> visibilityTimeoutExtender,
@NonNull ErrorHandlingStrategy<I> errorHandlingStrategy) {

this.worker = worker;
this.message = message;
this.finishedMessageCallback = finishedMessageCallback;
this.messages = messages;
this.message = message;
this.visibilityTimeoutExtender = visibilityTimeoutExtender;
this.errorHandlingStrategy = errorHandlingStrategy;

}

@Override
public void run() {
String messageId = String.valueOf(message.getHeaders().get("id", UUID.class));
Acknowledgement acknowledgment = message.getHeaders().get("Acknowledgment",
String messageId = String.valueOf(message.getMessage().getHeaders().get("id", UUID.class));
Acknowledgement acknowledgment = message.getMessage().getHeaders().get("Acknowledgment",
Acknowledgement.class);
try {
log.info("starting processing of message " + messageId);

O outcome = worker.work(message.getPayload(), message.getHeaders());
O outcome = worker.work(message.getMessage().getPayload(), message.getMessage().getHeaders());

finishedMessageCallback.call(message.getPayload(), outcome);
finishedMessageCallback.call(message.getMessage().getPayload(), outcome);
acknowledge(messageId, acknowledgment);
log.info("message task successfully processed and message acknowledged: " + messageId);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package com.mercateo.sqs.utils.message.handling;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest;

import io.awspring.cloud.messaging.listener.Acknowledgment;
import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement;

import java.util.concurrent.TimeUnit;

Expand All @@ -13,6 +10,8 @@
import lombok.SneakyThrows;

import org.springframework.messaging.Message;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;

@RequiredArgsConstructor
public class MessageWrapper<I> {
Expand All @@ -33,18 +32,19 @@ public String getReceiptHandle() {

@SneakyThrows
public synchronized void acknowledge() {
Acknowledgment acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgment.class);
Acknowledgement acknowledgment = message.getHeaders().get("Acknowledgment", Acknowledgement.class);
if (acknowledgment == null) {
throw new NullPointerException("there is no \"Acknowledgment\" in the message headers");
}
acknowledgment.acknowledge().get(2, TimeUnit.MINUTES);
acknowledgment.acknowledgeAsync().get(2, TimeUnit.MINUTES);
acknowledged = true;
}

public synchronized void changeMessageVisibility(AmazonSQS sqsClient, ChangeMessageVisibilityRequest request) {
@SneakyThrows
public synchronized void changeMessageVisibility(SqsAsyncClient sqsClient, ChangeMessageVisibilityRequest request) {
if (acknowledged) {
return;
}
sqsClient.changeMessageVisibility(request);
sqsClient.changeMessageVisibility(request).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.mercateo.sqs.utils.message.handling.ErrorHandlingStrategy;
import com.mercateo.sqs.utils.message.handling.MessageWrapper;

import java.net.UnknownHostException;
import java.time.Duration;
Expand All @@ -39,14 +40,14 @@ public class VisibilityTimeoutExtender implements Runnable {

private final ChangeMessageVisibilityRequest request;

private final Message<?> message;
private final MessageWrapper<?> message;

private final ErrorHandlingStrategy<?> errorHandlingStrategy;

private final Retryer<ChangeMessageVisibilityResponse> retryer;

VisibilityTimeoutExtender(@NonNull SqsAsyncClient sqsClient, @NonNull Duration newVisibilityTimeout,
@NonNull Message<?> message, @NonNull String queueUrl,
@NonNull MessageWrapper<?> message, @NonNull String queueUrl,
@NonNull ErrorHandlingStrategy<?> errorHandlingStrategy,
@NonNull RetryStrategy retryStrategy) {
this.sqsClient = sqsClient;
Expand All @@ -61,7 +62,7 @@ public class VisibilityTimeoutExtender implements Runnable {

request = ChangeMessageVisibilityRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.getHeaders().get("ReceiptHandle", String.class))
.receiptHandle(message.getMessage().getHeaders().get("ReceiptHandle", String.class))
.visibilityTimeout(timeoutInSeconds(newVisibilityTimeout))
.build();
}
Expand All @@ -78,7 +79,7 @@ public void run() {
} catch (AwsServiceException e) {
errorHandlingStrategy.handleExtendVisibilityTimeoutException(e, message);
} catch (Exception e) {
log.error("error while extending message visibility for " + message.getHeaders().get("MessageId",
log.error("error while extending message visibility for " + message.getMessage().getHeaders().get("MessageId",
String.class), e);
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void setUp() throws Exception {
public void handle_throws_exception() {
// Given
Exception e = new IllegalArgumentException();
Message<Integer> message = createMessage();
MessageWrapper<Integer> message = createMessage();

// When
Throwable throwable = catchThrowable(() -> uut.handleWorkerException(e, message));
Expand All @@ -41,11 +41,11 @@ public void handle_throws_exception() {

}

private Message<Integer> createMessage() {
private MessageWrapper<Integer> createMessage() {
HashMap<String, Object> headerMap = new HashMap<>();
headerMap.put("id", "mid");
headerMap.put("Acknowledgment", acknowledgment);
return new GenericMessage<>(3, new MessageHeaders(headerMap));
return new MessageWrapper<>(new GenericMessage<>(3, new MessageHeaders(headerMap)));
}

}
Loading

0 comments on commit f03cb48

Please sign in to comment.