Skip to content

Commit

Permalink
Merge pull request #1428 from robzienert/termination-lifecycle-sns-un…
Browse files Browse the repository at this point in the history
…wrap

Updating termination lifecycle agent to unwrap sns messages
  • Loading branch information
robzienert authored Feb 15, 2017
2 parents 1d69688 + 39f5efc commit 59986d1
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void run() {
ensureTopicExists(amazonSNS, topicARN, allAccountIds, queueARN);

AtomicInteger messagesProcessed = new AtomicInteger(0);
AtomicInteger messagesSkipped = new AtomicInteger(0);
while (messagesProcessed.get() < properties.getMaxMessagesPerCycle()) {
ReceiveMessageResult receiveMessageResult = amazonSQS.receiveMessage(
new ReceiveMessageRequest(queueId)
Expand All @@ -140,30 +141,57 @@ public void run() {
}

receiveMessageResult.getMessages().forEach(message -> {
try {
LifecycleMessage lifecycleMessage = objectMapper.readValue(message.getBody(), LifecycleMessage.class);

if (SUPPORTED_LIFECYCLE_TRANSITION.equalsIgnoreCase(lifecycleMessage.lifecycleTransition)) {
Task originalTask = TaskRepository.threadLocalTask.get();
try {
TaskRepository.threadLocalTask.set(
Optional.ofNullable(originalTask).orElse(new DefaultTask(InstanceTerminationLifecycleAgent.class.getSimpleName()))
);
handleMessage(lifecycleMessage, TaskRepository.threadLocalTask.get());
} finally {
TaskRepository.threadLocalTask.set(originalTask);
}
LifecycleMessage lifecycleMessage = unmarshalLifecycleMessage(message.getBody());

if (lifecycleMessage != null) {
if (!SUPPORTED_LIFECYCLE_TRANSITION.equalsIgnoreCase(lifecycleMessage.lifecycleTransition)) {
log.info("Ignoring unsupported lifecycle transition: " + lifecycleMessage.lifecycleTransition);
messagesSkipped.incrementAndGet();
return;
}

Task originalTask = TaskRepository.threadLocalTask.get();
try {
TaskRepository.threadLocalTask.set(
Optional.ofNullable(originalTask).orElse(new DefaultTask(InstanceTerminationLifecycleAgent.class.getSimpleName()))
);
handleMessage(lifecycleMessage, TaskRepository.threadLocalTask.get());
} finally {
TaskRepository.threadLocalTask.set(originalTask);
}
} catch (IOException e) {
log.error("Unable to convert NotificationMessage (body: {})", message.getBody(), e);
} else {
messagesSkipped.incrementAndGet();
}

deleteMessage(amazonSQS, queueId, message);
messagesProcessed.incrementAndGet();
});
}

log.info("Processed {} messages (queueARN: {})", messagesProcessed.get(), queueARN.arn);
log.info("Processed {} messages, {} skipped (queueARN: {})", messagesProcessed.get(), messagesSkipped.get(), queueARN.arn);
}

private LifecycleMessage unmarshalLifecycleMessage(String messageBody) {
String body = messageBody;
try {
NotificationMessageWrapper wrapper = objectMapper.readValue(messageBody, NotificationMessageWrapper.class);
if (wrapper != null && wrapper.message != null) {
body = wrapper.message;
}
} catch (IOException e) {
// Try to unwrap a notification message; if that doesn't work,
// assume that we're dealing with a message directly from SQS.
log.debug("Unable unmarshal NotificationMessageWrapper. Assuming SQS message. (body: {})", messageBody, e);
}

LifecycleMessage lifecycleMessage = null;
try {
lifecycleMessage = objectMapper.readValue(body, LifecycleMessage.class);
} catch (IOException e) {
log.error("Unable to unmarshal LifecycleMessage (body: {})", body, e);
}

return lifecycleMessage;
}

private void handleMessage(LifecycleMessage message, Task task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.netflix.spinnaker.clouddriver.data.task.DefaultTask
import com.netflix.spinnaker.clouddriver.data.task.Task
import com.netflix.spinnaker.clouddriver.eureka.deploy.ops.AbstractEurekaSupport.DiscoveryStatus
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject

Expand Down Expand Up @@ -56,9 +57,12 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {
def queueARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sqs:us-west-2:100:queueName")
def topicARN = new ARN([mgmtCredentials, testCredentials], "arn:aws:sns:us-west-2:100:topicName")

@Shared
def objectMapper = new ObjectMapper()

@Subject
def subject = new InstanceTerminationLifecycleAgent(
Mock(ObjectMapper),
objectMapper,
Mock(AmazonClientProvider),
accountCredentialsProvider,
new InstanceTerminationConfigurationProperties(
Expand Down Expand Up @@ -135,4 +139,40 @@ class InstanceTerminationLifecycleAgentSpec extends Specification {
['i-1234']
)
}

def 'should process both sns and sqs messages'() {
given:
LifecycleMessage lifecycleMessage = new LifecycleMessage(
accountId: '1234',
autoScalingGroupName: 'clouddriver-main-v000',
ec2InstanceId: 'i-1324',
lifecycleTransition: 'autoscaling:EC2_INSTANCE_TERMINATING'
)
String sqsMessage = objectMapper.writeValueAsString(lifecycleMessage)
String snsMessage = objectMapper.writeValueAsString(new NotificationMessageWrapper(
subject: 'lifecycle message',
message: sqsMessage
))

when:
LifecycleMessage result = subject.unmarshalLifecycleMessage(snsMessage)

then:
result.accountId == lifecycleMessage.accountId
result.autoScalingGroupName == lifecycleMessage.autoScalingGroupName
result.ec2InstanceId == lifecycleMessage.ec2InstanceId
result.lifecycleTransition == lifecycleMessage.lifecycleTransition

when:
result = subject.unmarshalLifecycleMessage(sqsMessage)

then:
result.accountId == lifecycleMessage.accountId
result.autoScalingGroupName == lifecycleMessage.autoScalingGroupName
result.ec2InstanceId == lifecycleMessage.ec2InstanceId
result.lifecycleTransition == lifecycleMessage.lifecycleTransition
}



}

0 comments on commit 59986d1

Please sign in to comment.