diff --git a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgent.java b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgent.java index ff75aebc0f8..d4631d7efc7 100644 --- a/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgent.java +++ b/clouddriver-aws/src/main/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgent.java @@ -126,6 +126,7 @@ public void run() { ensureTopicExists(amazonSNS, topicARN, allAccountIds, queueARN); AtomicInteger messagesProcessed = new AtomicInteger(0); + AtomicInteger messagesSkipped = new AtomicInteger(0); while (messagesProcessed.get() < properties.getMaxMessagesPerCycle()) { ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage( new ReceiveMessageRequest(queueId) @@ -140,22 +141,26 @@ public void run() { } receiveMessageResult.getMessages().forEach(message -> { - try { - LifecycleMessage lifecycleMessage = objectMapper.readValue(message.getBody(), LifecycleMessage.class); - - if (SUPPORTED_LIFECYCLE_TRANSITION.equalsIgnoreCase(lifecycleMessage.lifecycleTransition)) { - Task originalTask = TaskRepository.threadLocalTask.get(); - try { - TaskRepository.threadLocalTask.set( - Optional.ofNullable(originalTask).orElse(new DefaultTask(InstanceTerminationLifecycleAgent.class.getSimpleName())) - ); - handleMessage(lifecycleMessage, TaskRepository.threadLocalTask.get()); - } finally { - TaskRepository.threadLocalTask.set(originalTask); - } + LifecycleMessage lifecycleMessage = unmarshalLifecycleMessage(message.getBody()); + + if (lifecycleMessage != null) { + if (!SUPPORTED_LIFECYCLE_TRANSITION.equalsIgnoreCase(lifecycleMessage.lifecycleTransition)) { + log.info("Ignoring unsupported lifecycle transition: " + lifecycleMessage.lifecycleTransition); + messagesSkipped.incrementAndGet(); + return; + } + + Task originalTask = TaskRepository.threadLocalTask.get(); + try { + TaskRepository.threadLocalTask.set( + Optional.ofNullable(originalTask).orElse(new DefaultTask(InstanceTerminationLifecycleAgent.class.getSimpleName())) + ); + handleMessage(lifecycleMessage, TaskRepository.threadLocalTask.get()); + } finally { + TaskRepository.threadLocalTask.set(originalTask); } - } catch (IOException e) { - log.error("Unable to convert NotificationMessage (body: {})", message.getBody(), e); + } else { + messagesSkipped.incrementAndGet(); } deleteMessage(amazonSQS, queueId, message); @@ -163,7 +168,30 @@ public void run() { }); } - log.info("Processed {} messages (queueARN: {})", messagesProcessed.get(), queueARN.arn); + log.info("Processed {} messages, {} skipped (queueARN: {})", messagesProcessed.get(), messagesSkipped.get(), queueARN.arn); + } + + private LifecycleMessage unmarshalLifecycleMessage(String messageBody) { + String body = messageBody; + try { + NotificationMessageWrapper wrapper = objectMapper.readValue(messageBody, NotificationMessageWrapper.class); + if (wrapper != null && wrapper.message != null) { + body = wrapper.message; + } + } catch (IOException e) { + // Try to unwrap a notification message; if that doesn't work, + // assume that we're dealing with a message directly from SQS. + log.debug("Unable unmarshal NotificationMessageWrapper. Assuming SQS message. (body: {})", messageBody, e); + } + + LifecycleMessage lifecycleMessage = null; + try { + lifecycleMessage = objectMapper.readValue(body, LifecycleMessage.class); + } catch (IOException e) { + log.error("Unable to unmarshal LifecycleMessage (body: {})", body, e); + } + + return lifecycleMessage; } private void handleMessage(LifecycleMessage message, Task task) { diff --git a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgentSpec.groovy b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgentSpec.groovy index 5cfec1562b1..26852f57577 100644 --- a/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgentSpec.groovy +++ b/clouddriver-aws/src/test/groovy/com/netflix/spinnaker/clouddriver/aws/lifecycle/InstanceTerminationLifecycleAgentSpec.groovy @@ -29,6 +29,7 @@ import com.netflix.spinnaker.clouddriver.data.task.DefaultTask import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.eureka.deploy.ops.AbstractEurekaSupport.DiscoveryStatus import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider +import spock.lang.Shared import spock.lang.Specification import spock.lang.Subject @@ -56,9 +57,12 @@ class InstanceTerminationLifecycleAgentSpec extends Specification { def queueARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sqs:us-west-2:100:queueName") def topicARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sns:us-west-2:100:topicName") + @Shared + def objectMapper = new ObjectMapper() + @Subject def subject = new InstanceTerminationLifecycleAgent( - Mock(ObjectMapper), + objectMapper, Mock(AmazonClientProvider), accountCredentialsProvider, new InstanceTerminationConfigurationProperties( @@ -135,4 +139,40 @@ class InstanceTerminationLifecycleAgentSpec extends Specification { ['i-1234'] ) } + + def 'should process both sns and sqs messages'() { + given: + LifecycleMessage lifecycleMessage = new LifecycleMessage( + accountId: '1234', + autoScalingGroupName: 'clouddriver-main-v000', + ec2InstanceId: 'i-1324', + lifecycleTransition: 'autoscaling:EC2_INSTANCE_TERMINATING' + ) + String sqsMessage = objectMapper.writeValueAsString(lifecycleMessage) + String snsMessage = objectMapper.writeValueAsString(new NotificationMessageWrapper( + subject: 'lifecycle message', + message: sqsMessage + )) + + when: + LifecycleMessage result = subject.unmarshalLifecycleMessage(snsMessage) + + then: + result.accountId == lifecycleMessage.accountId + result.autoScalingGroupName == lifecycleMessage.autoScalingGroupName + result.ec2InstanceId == lifecycleMessage.ec2InstanceId + result.lifecycleTransition == lifecycleMessage.lifecycleTransition + + when: + result = subject.unmarshalLifecycleMessage(sqsMessage) + + then: + result.accountId == lifecycleMessage.accountId + result.autoScalingGroupName == lifecycleMessage.autoScalingGroupName + result.ec2InstanceId == lifecycleMessage.ec2InstanceId + result.lifecycleTransition == lifecycleMessage.lifecycleTransition + } + + + }