From 7619bc88bb767c869d670a781579602d0f7eaf2b Mon Sep 17 00:00:00 2001 From: Dwayne Jeng Date: Wed, 15 Feb 2023 01:50:41 -0800 Subject: [PATCH] Move S3EventNotificationCallback from Exporter to Worker --- .../bridge/exporter/config/SpringConfig.java | 14 - .../bridge/exporter/helper/BridgeHelper.java | 16 -- .../S3EventNotificationCallback.java | 113 -------- src/main/resources/BridgeExporter.conf | 6 - .../exporter/helper/BridgeHelperTest.java | 12 - .../S3EventNotificationCallbackTest.java | 249 ------------------ 6 files changed, 410 deletions(-) delete mode 100644 src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java delete mode 100644 src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java diff --git a/src/main/java/org/sagebionetworks/bridge/exporter/config/SpringConfig.java b/src/main/java/org/sagebionetworks/bridge/exporter/config/SpringConfig.java index 39fc084..b62c03d 100644 --- a/src/main/java/org/sagebionetworks/bridge/exporter/config/SpringConfig.java +++ b/src/main/java/org/sagebionetworks/bridge/exporter/config/SpringConfig.java @@ -30,7 +30,6 @@ import org.sagebionetworks.bridge.config.PropertiesConfig; import org.sagebionetworks.bridge.dynamodb.DynamoQueryHelper; import org.sagebionetworks.bridge.dynamodb.DynamoScanHelper; -import org.sagebionetworks.bridge.exporter.notification.S3EventNotificationCallback; import org.sagebionetworks.bridge.exporter.request.BridgeExporterSqsCallback; import org.sagebionetworks.bridge.exporter.synapse.ColumnDefinition; import org.sagebionetworks.bridge.exporter.util.BridgeExporterUtil; @@ -181,19 +180,6 @@ public PollSqsWorker exporterSqsWorker(BridgeExporterSqsCallback exporterSqsCall return sqsWorker; } - @Bean - @Autowired - public PollSqsWorker s3NotificationSqsWorker(S3EventNotificationCallback s3NotificationCallback) { - Config config = bridgeConfig(); - - PollSqsWorker sqsWorker = new PollSqsWorker(); - sqsWorker.setCallback(s3NotificationCallback); - sqsWorker.setQueueUrl(config.get("s3.notification.sqs.queue.url")); - sqsWorker.setSleepTimeMillis(config.getInt("s3.notification.sqs.sleep.time.millis")); - sqsWorker.setSqsHelper(sqsHelper()); - return sqsWorker; - } - @Bean public SynapseClient synapseClient() { Config config = bridgeConfig(); diff --git a/src/main/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelper.java b/src/main/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelper.java index b301a22..4b06e1c 100644 --- a/src/main/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelper.java +++ b/src/main/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelper.java @@ -41,22 +41,6 @@ public final void setBridgeClientManager(ClientManager bridgeClientManager) { this.bridgeClientManager = bridgeClientManager; } - /** - * Signals Bridge Server that the upload is completed and to begin processing the upload. Used by Upload - * Auto-Complete. - * - * @param uploadId - * upload to mark completed and begin processing - */ - public void completeUpload(String uploadId) { - try { - bridgeClientManager.getClient(ForWorkersApi.class).completeUploadSession(uploadId, null) - .execute(); - } catch (IOException ex) { - throw new BridgeSDKException("Error completing upload to Bridge: " + ex.getMessage(), ex); - } - } - /** Gets the participant from Bridge for the specified study and health code. */ @Cacheable(lifetime = 5, unit = TimeUnit.MINUTES) public StudyParticipant getParticipantByHealthCode(String studyId, String healthCode) { diff --git a/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java b/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java deleted file mode 100644 index 08a9b62..0000000 --- a/src/main/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallback.java +++ /dev/null @@ -1,113 +0,0 @@ -package org.sagebionetworks.bridge.exporter.notification; - -import java.util.List; - -import com.amazonaws.services.s3.event.S3EventNotification; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.sagebionetworks.bridge.exporter.helper.BridgeHelper; -import org.sagebionetworks.bridge.rest.exceptions.BridgeSDKException; -import org.sagebionetworks.bridge.sqs.PollSqsCallback; - -@Component -public class S3EventNotificationCallback implements PollSqsCallback { - private static final Logger LOG = LoggerFactory.getLogger(S3EventNotificationCallback.class); - private static final String S3_EVENT_SOURCE = "aws:s3"; - private static final String S3_OBJECT_CREATED_EVENT_PREFIX = "ObjectCreated:"; - private static final String SNS_KEY_MESSAGE = "Message"; - - private static final ObjectMapper OBJECT_MAPPER; - - static { - OBJECT_MAPPER = new ObjectMapper(); - OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private BridgeHelper bridgeHelper; - - /** Bridge helper, used to call Bridge Server to complete the upload. */ - @Autowired - public final void setBridgeHelper(BridgeHelper bridgeHelper) { - this.bridgeHelper = bridgeHelper; - } - - @Override - public void callback(String messageBody) { - JsonNode wrapperNode; - try { - wrapperNode = OBJECT_MAPPER.readTree(messageBody); - } catch (JsonProcessingException ex) { - // Malformed JSON. Log a warning and squelch. - LOG.warn("SNS notification is malformed JSON: " + messageBody); - return; - } - if (wrapperNode == null || !wrapperNode.hasNonNull(SNS_KEY_MESSAGE)) { - // Wrapper node doesn't exist or doesn't have a Message field. Log a warning and squelch. - LOG.warn("SNS notification doesn't contain an S3 notification: " + messageBody); - return; - } - - S3EventNotification notification; - try { - notification = OBJECT_MAPPER.convertValue(wrapperNode.get(SNS_KEY_MESSAGE), S3EventNotification.class); - } catch (IllegalArgumentException ex) { - // Malformed S3 notification. Log a warning and squelch. - LOG.warn("S3 notification is malformed: " + messageBody); - return; - } - List recordList = notification.getRecords(); - if (recordList == null || recordList.isEmpty()) { - // Notification w/o record list is not actionable. Log a warning and squelch. - LOG.warn("S3 notification without record list: " + messageBody); - return; - } - - callback(notification); - } - - // package-scoped to enable mocking/testing - void callback(S3EventNotification notification) { - notification.getRecords().stream().filter(this::shouldProcessRecord).forEach(record -> { - String uploadId = record.getS3().getObject().getKey(); - - try { - bridgeHelper.completeUpload(uploadId); - LOG.info("Completed upload, id=" + uploadId); - } catch (BridgeSDKException ex) { - String errorMsg = "Error completing upload id " + uploadId + ": " + ex.getMessage(); - int status = ex.getStatusCode(); - if (status == 400 || 404 <= status && status <= 499) { - // HTTP 4XX means bad request (such as 404 not found). This can happen for a variety of reasons and - // is generally not developer actionable. Log a warning and swallow the exception. This way, the - // SQS poll worker will succeed the callback and delete the message, preventing spurious retries. - // - // We should still retry 401s and 403s because these indicate problems with our client and not - // problems with the session. - LOG.warn(errorMsg, ex); - } else { - // A non-4XX error generally means a server error. We'll want to retry this. Log an error and - // re-throw. - LOG.error(errorMsg, ex); - - // Foreach handlers can't throw checked exceptions. It's not worth creating an unchecked exception - // given that we're about to refactor error handling. For now, just throw a RuntimeException. - throw new RuntimeException(errorMsg, ex); - } - } - }); - } - - // package-scoped to enable mocking/testing - boolean shouldProcessRecord(S3EventNotification.S3EventNotificationRecord record) { - return S3_EVENT_SOURCE.equals(record.getEventSource()) && record.getEventName().startsWith(S3_OBJECT_CREATED_EVENT_PREFIX); - } -} diff --git a/src/main/resources/BridgeExporter.conf b/src/main/resources/BridgeExporter.conf index 9c7cf03..8cf6d1d 100644 --- a/src/main/resources/BridgeExporter.conf +++ b/src/main/resources/BridgeExporter.conf @@ -11,7 +11,6 @@ synapse.access.token=your-access-token-here synapse.principal.id=your-principal-id-here exporter.request.sqs.sleep.time.millis=125 -s3.notification.sqs.sleep.time.millis=125 heartbeat.interval.minutes=30 record.loop.delay.millis=30 record.loop.progress.report.period=1000 @@ -37,11 +36,6 @@ dev.exporter.request.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250 uat.exporter.request.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-EX-Request-uat prod.exporter.request.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-EX-Request-prod -local.s3.notification.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-UploadComplete-Notification-local -dev.s3.notification.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-UploadComplete-Notification-dev -uat.s3.notification.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-UploadComplete-Notification-uat -prod.s3.notification.sqs.queue.url=https://sqs.us-east-1.amazonaws.com/649232250620/Bridge-UploadComplete-Notification-prod - team.bridge.admin = 3388390 team.bridge.staff = 3388389 prod.team.bridge.admin = 3388392 diff --git a/src/test/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelperTest.java b/src/test/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelperTest.java index 35d4e21..43c1552 100644 --- a/src/test/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelperTest.java +++ b/src/test/java/org/sagebionetworks/bridge/exporter/helper/BridgeHelperTest.java @@ -34,7 +34,6 @@ import org.sagebionetworks.bridge.rest.model.UploadFieldType; import org.sagebionetworks.bridge.rest.model.UploadSchema; import org.sagebionetworks.bridge.rest.model.UploadSchemaType; -import org.sagebionetworks.bridge.rest.model.UploadValidationStatus; import org.sagebionetworks.bridge.schema.UploadSchemaKey; @SuppressWarnings("unchecked") @@ -81,17 +80,6 @@ public void setup() { bridgeHelper.setBridgeClientManager(mockClientManager); } - @Test - public void completeUpload() throws Exception { - // mock call - Call mockCall = mock(Call.class); - when(mockWorkersApi.completeUploadSession("test-upload", null)).thenReturn(mockCall); - - // execute and verify - bridgeHelper.completeUpload("test-upload"); - verify(mockCall).execute(); - } - @Test public void getParticipantByHealthCode() throws Exception { // mock Bridge client diff --git a/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java b/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java deleted file mode 100644 index 04dc0f0..0000000 --- a/src/test/java/org/sagebionetworks/bridge/exporter/notification/S3EventNotificationCallbackTest.java +++ /dev/null @@ -1,249 +0,0 @@ -package org.sagebionetworks.bridge.exporter.notification; - -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import com.amazonaws.services.s3.event.S3EventNotification; - -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import com.google.common.collect.Lists; - -import org.sagebionetworks.bridge.exporter.helper.BridgeHelper; -import org.sagebionetworks.bridge.rest.exceptions.BridgeSDKException; - -public class S3EventNotificationCallbackTest { - - private static final String UPLOAD_COMPLETE_MESSAGE= "{" + - " \"Message\":" + - "{\"Records\":[{\"eventVersion\":\"2.0\",\"eventSource\":\"aws:s3\"," + - "\"awsRegion\":\"us-east-1\"," + - "\"eventTime\":\"2016-07-12T22:06:54.454Z\",\"eventName\":\"ObjectCreated:Put\"," + - "\"userIdentity\":{\"principalId\":\"AWS:AIDAJCSQZ35H7B4BFOVAW\"},\"requestParameters\":{\"sourceIPAddress\":\"54.87.180" + - ".29\"},\"responseElements\":{\"x-amz-request-id\":\"006B5645F94646A3\"," + - "\"x-amz-id-2\":\"wk7j6of4ftpRy+lbjt4olcNqp8S2s9d7XUbdOv4UEDh7B+8myhMe45xBEZPUP4+5oxwY9r2z9Yw=\"}," + - "\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"Bridge Upload Complete Notification UAT\"," + - "\"bucket\":{\"name\":\"org-sagebridge-upload-uat\",\"ownerIdentity\":{\"principalId\":\"AZ9HQM5UC903F\"}," + - "\"arn\":\"arn:aws:s3:::org-sagebridge-upload-uat\"},\"object\":{\"key\":\"89b40dab-4982-4d5c-ae21-d74b072d02cd\"," + - "\"size\":1488,\"eTag\":\"e40df5cfa5874ab353947eb48ec0cfa4\",\"sequencer\":\"00578569FE6792370C\"}}}]}" + - "}"; - private static final String UPLOAD_ID = "89b40dab-4982-4d5c-ae21-d74b072d02cd"; - - private BridgeHelper mockBridgeHelper; - private S3EventNotificationCallback callback; - - - @BeforeMethod - public void before() { - mockBridgeHelper = mock(BridgeHelper.class); - - callback = spy(new S3EventNotificationCallback()); - callback.setBridgeHelper(mockBridgeHelper); - } - - @Test - public void testCallback_StringMessage() { - callback.callback(UPLOAD_COMPLETE_MESSAGE); - - verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID); - } - - @Test - public void testCallback_MalformedMessage() { - callback.callback("malformed \" message"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_BlankWrapper() { - callback.callback(""); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_NullWrapper() { - callback.callback("null"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_WrapperInvalidType() { - callback.callback("\"wrong type\""); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_NoMessage() { - callback.callback("{}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_NullMessage() { - callback.callback("{\"Message\":null}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_MessageWrongType() { - callback.callback("{\"Message\":\"wrong type\"}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_NoRecordList() { - callback.callback("{\"Message\":{}}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_NullRecordList() { - callback.callback("{\"Message\":{\"Records\":null}}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_RecordListWrongType() { - callback.callback("{\"Message\":{\"Records\":\"wrong type\"}}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - @Test - public void testCallback_EmptyList() { - callback.callback("{\"Message\":{\"Records\":[]}}"); - verify(mockBridgeHelper, never()).completeUpload(anyString()); - } - - // Exceptions that Upload Autocomplete should propagate (and retry) - @DataProvider(name = "propagatedExceptionDataProvider") - public Object[][] propagatedExceptionDataProvider() { - return new Object[][] { - { 401 }, - { 403 }, - { 500 }, - }; - } - - @Test(dataProvider = "propagatedExceptionDataProvider") - public void testCallback_PropagatesExceptions(int status) { - doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID); - - try { - callback.callback(UPLOAD_COMPLETE_MESSAGE); - fail("expected exception"); - } catch (RuntimeException ex) { - BridgeSDKException innerEx = (BridgeSDKException) ex.getCause(); - assertEquals(status, innerEx.getStatusCode()); - } - } - - // Exceptions that Upload Autocomplete should suppress (deterministic errors that shouldn't be retried) - @DataProvider(name = "suppressedExceptionDataProvider") - public Object[][] suppressedExceptionDataProvider() { - return new Object[][] { - { 400 }, - { 404 }, - { 412 }, - }; - } - - @Test(dataProvider = "suppressedExceptionDataProvider") - public void testCallback_SuppressesExceptions(int status) { - doThrow(new BridgeSDKException("test exception", status)).when(mockBridgeHelper).completeUpload(UPLOAD_ID); - callback.callback(UPLOAD_COMPLETE_MESSAGE); - verify(mockBridgeHelper, times(1)).completeUpload(UPLOAD_ID); - } - - @Test - public void testCallback_CompleteUploadForShouldProcessRecords() { - String key1 = "key1"; - String key2 = "key2"; - String key3 = "key3"; - - S3EventNotification.S3EventNotificationRecord record1 = createMockRecordWithKey(key1); - S3EventNotification.S3EventNotificationRecord record2 = createMockRecordWithKey(key2); - S3EventNotification.S3EventNotificationRecord record3 = createMockRecordWithKey(key3); - record1.getS3().getObject().getKey(); - - S3EventNotification notification = mock(S3EventNotification.class); - when(notification.getRecords()).thenReturn(Lists.newArrayList(record1, record2, record3)); - - doReturn(true).when(callback).shouldProcessRecord(record1); - doReturn(false).when(callback).shouldProcessRecord(record2); - doReturn(true).when(callback).shouldProcessRecord(record3); - - callback.callback(notification); - - verify(mockBridgeHelper, times(1)).completeUpload(key1); - verify(mockBridgeHelper, times(1)).completeUpload(key3); - } - - private S3EventNotification.S3EventNotificationRecord createMockRecordWithKey(String key) { - S3EventNotification.S3ObjectEntity object = mock(S3EventNotification.S3ObjectEntity.class); - when(object.getKey()).thenReturn(key); - - S3EventNotification.S3Entity entity = mock(S3EventNotification.S3Entity.class); - when(entity.getObject()).thenReturn(object); - - S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); - when(record.getS3()).thenReturn(entity); - - return record; - } - - @Test - public void testShouldProcessRecords_S3Put() { - S3EventNotification.S3EventNotificationRecord s3Put = createMockRecord("aws:s3", "ObjectCreated:Put"); - - assertTrue(callback.shouldProcessRecord(s3Put)); - } - - @Test - public void testShouldProcessRecords_S3CompleteMultipartUpload() { - S3EventNotification.S3EventNotificationRecord s3Put = createMockRecord("aws:s3", "ObjectCreated:CompleteMultipartUpload"); - - assertTrue(callback.shouldProcessRecord(s3Put)); - } - - @Test - public void testShouldProcessRecords_S3Post() { - S3EventNotification.S3EventNotificationRecord s3Put = createMockRecord("aws:s3", "ObjectCreated:Post"); - - assertTrue(callback.shouldProcessRecord(s3Put)); - } - - @Test - public void testShouldProcessRecords_S3Delete() { - S3EventNotification.S3EventNotificationRecord s3Delete = createMockRecord("aws:s3", "ObjectRemoved:Delete"); - - assertFalse(callback.shouldProcessRecord(s3Delete)); - } - - @Test - public void testShouldProcessRecords_NotS3() { - S3EventNotification.S3EventNotificationRecord notS3 = createMockRecord("aws:dynamo", "ObjectRemoved:Delete"); - - assertFalse(callback.shouldProcessRecord(notS3)); - } - - private S3EventNotification.S3EventNotificationRecord createMockRecord(String source, String name) { - S3EventNotification.S3EventNotificationRecord record = mock(S3EventNotification.S3EventNotificationRecord.class); - when(record.getEventSource()).thenReturn(source); - when(record.getEventName()).thenReturn(name); - - return record; - } -}