From cfff8b0c7410b5049587693701d580873b88f429 Mon Sep 17 00:00:00 2001 From: Dwayne Jeng Date: Tue, 7 Feb 2023 16:57:44 -0800 Subject: [PATCH] Fix S3EventNotificationCallback --- .../S3EventNotificationCallback.java | 28 ++++++- .../S3EventNotificationCallbackTest.java | 75 ++++++++++++++++--- 2 files changed, 92 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java b/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java index 3460f6c..08a9b62 100644 --- a/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java +++ b/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java @@ -4,6 +4,8 @@ import com.amazonaws.services.s3.event.S3EventNotification; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -21,6 +23,7 @@ public class S3EventNotificationCallback implements PollSqsCallback { private static final Logger LOG = LoggerFactory.getLogger(S3EventNotificationCallback.class); private static final String S3_EVENT_SOURCE = "aws:s3"; private static final String S3_OBJECT_CREATED_EVENT_PREFIX = "ObjectCreated:"; + private static final String SNS_KEY_MESSAGE = "Message"; private static final ObjectMapper OBJECT_MAPPER; @@ -38,8 +41,29 @@ public final void setBridgeHelper(BridgeHelper bridgeHelper) { } @Override - public void callback(String messageBody) throws Exception { - S3EventNotification notification = OBJECT_MAPPER.readValue(messageBody, S3EventNotification.class); + public void callback(String messageBody) { + JsonNode wrapperNode; + try { + wrapperNode = OBJECT_MAPPER.readTree(messageBody); + } catch (JsonProcessingException ex) { + // Malformed JSON. Log a warning and squelch. + LOG.warn("SNS notification is malformed JSON: " + messageBody); + return; + } + if (wrapperNode == null || !wrapperNode.hasNonNull(SNS_KEY_MESSAGE)) { + // Wrapper node doesn't exist or doesn't have a Message field. Log a warning and squelch. + LOG.warn("SNS notification doesn't contain an S3 notification: " + messageBody); + return; + } + + S3EventNotification notification; + try { + notification = OBJECT_MAPPER.convertValue(wrapperNode.get(SNS_KEY_MESSAGE), S3EventNotification.class); + } catch (IllegalArgumentException ex) { + // Malformed S3 notification. Log a warning and squelch. + LOG.warn("S3 notification is malformed: " + messageBody); + return; + } List recordList = notification.getRecords(); if (recordList == null || recordList.isEmpty()) { // Notification w/o record list is not actionable. Log a warning and squelch. diff --git a/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java b/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java index 61e86dc..04dc0f0 100644 --- a/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java +++ b/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java @@ -27,7 +27,9 @@ public class S3EventNotificationCallbackTest { - private static final String UPLOAD_COMPLETE_MESSAGE="{\"Records\":[{\"eventVersion\":\"2.0\",\"eventSource\":\"aws:s3\"," + + private static final String UPLOAD_COMPLETE_MESSAGE= "{" + + " \"Message\":" + + "{\"Records\":[{\"eventVersion\":\"2.0\",\"eventSource\":\"aws:s3\"," + "\"awsRegion\":\"us-east-1\"," + "\"eventTime\":\"2016-07-12T22:06:54.454Z\",\"eventName\":\"ObjectCreated:Put\"," + "\"userIdentity\":{\"principalId\":\"AWS:AIDAJCSQZ35H7B4BFOVAW\"},\"requestParameters\":{\"sourceIPAddress\":\"54.87.180" + @@ -36,7 +38,8 @@ public class S3EventNotificationCallbackTest { "\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"Bridge Upload Complete Notification UAT\"," + "\"bucket\":{\"name\":\"org-sagebridge-upload-uat\",\"ownerIdentity\":{\"principalId\":\"AZ9HQM5UC903F\"}," + "\"arn\":\"arn:aws:s3:::org-sagebridge-upload-uat\"},\"object\":{\"key\":\"89b40dab-4982-4d5c-ae21-d74b072d02cd\"," + - "\"size\":1488,\"eTag\":\"e40df5cfa5874ab353947eb48ec0cfa4\",\"sequencer\":\"00578569FE6792370C\"}}}]}"; + "\"size\":1488,\"eTag\":\"e40df5cfa5874ab353947eb48ec0cfa4\",\"sequencer\":\"00578569FE6792370C\"}}}]}" + + "}"; private static final String UPLOAD_ID = "89b40dab-4982-4d5c-ae21-d74b072d02cd"; private BridgeHelper mockBridgeHelper; @@ -52,21 +55,75 @@ public void before() { } @Test - public void testCallback_StringMessage() throws Exception { + public void testCallback_StringMessage() { callback.callback(UPLOAD_COMPLETE_MESSAGE); verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID); } @Test - public void testCallback_NullList() throws Exception { + public void testCallback_MalformedMessage() { + callback.callback("malformed \" message"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_BlankWrapper() { + callback.callback(""); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_NullWrapper() { + callback.callback("null"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_WrapperInvalidType() { + callback.callback("\"wrong type\""); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_NoMessage() { callback.callback("{}"); verify(mockBridgeHelper, never()).completeUpload(anyString()); } @Test - public void testCallback_EmptyList() throws Exception { - callback.callback("{\"Records\":[]}"); + public void testCallback_NullMessage() { + callback.callback("{\"Message\":null}"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_MessageWrongType() { + callback.callback("{\"Message\":\"wrong type\"}"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_NoRecordList() { + callback.callback("{\"Message\":{}}"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_NullRecordList() { + callback.callback("{\"Message\":{\"Records\":null}}"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_RecordListWrongType() { + callback.callback("{\"Message\":{\"Records\":\"wrong type\"}}"); + verify(mockBridgeHelper, never()).completeUpload(anyString()); + } + + @Test + public void testCallback_EmptyList() { + callback.callback("{\"Message\":{\"Records\":[]}}"); verify(mockBridgeHelper, never()).completeUpload(anyString()); } @@ -81,7 +138,7 @@ public Object[][] propagatedExceptionDataProvider() { } @Test(dataProvider = "propagatedExceptionDataProvider") - public void testCallback_PropagatesExceptions(int status) throws Exception { + public void testCallback_PropagatesExceptions(int status) { doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID); try { @@ -104,14 +161,14 @@ public Object[][] suppressedExceptionDataProvider() { } @Test(dataProvider = "suppressedExceptionDataProvider") - public void testCallback_SuppressesExceptions(int status) throws Exception { + public void testCallback_SuppressesExceptions(int status) { doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID); callback.callback(UPLOAD_COMPLETE_MESSAGE); verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID); } @Test - public void testCallback_CompleteUploadForShouldProcessRecords() throws Exception { + public void testCallback_CompleteUploadForShouldProcessRecords() { String key1 = "key1"; String key2 = "key2"; String key3 = "key3";