Skip to content

Commit

Permalink
Fix S3 sink writing to closed stream exception
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Aug 15, 2023
1 parent 60f69b3 commit c31fa8f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ void output(Collection<Record<Event>> records) {
releaseEventHandles(false);
}
currentBuffer = bufferFactory.getBuffer();
outputStream = currentBuffer.getOutputStream();
}
}
} catch (IOException | InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,34 @@ void test_output_with_uploadedToS3_success_records_byte_count() throws IOExcepti
verify(s3ObjectSizeSummary, times(50)).record(objectSize);
}

@Test
void test_output_with_uploadedToS3_midBatch_generatesNewOutputStream() throws IOException {

bufferFactory = mock(BufferFactory.class);
InMemoryBuffer buffer = mock(InMemoryBuffer.class);
when(buffer.getEventCount()).thenReturn(10);
doNothing().when(buffer).flushToS3(any(S3Client.class), anyString(), any(String.class));
when(bufferFactory.getBuffer()).thenReturn(buffer);
final OutputStream outputStream1 = mock(OutputStream.class);
final OutputStream outputStream2 = mock(OutputStream.class);
when(buffer.getOutputStream())
.thenReturn(outputStream1)
.thenReturn(outputStream2);

doNothing().when(codec).writeEvent(any(), eq(outputStream1));
doNothing().when(codec).writeEvent(any(), eq(outputStream2));

S3SinkService s3SinkService = createObjectUnderTest();
assertNotNull(s3SinkService);
assertThat(s3SinkService, instanceOf(S3SinkService.class));

s3SinkService.output(generateEventRecords(2));

verify(snapshotSuccessCounter, times(2)).increment();
verify(codec).writeEvent(any(), eq(outputStream1));
verify(codec).writeEvent(any(), eq(outputStream2));
}

@Test
void test_output_with_uploadedToS3_failed() throws IOException {
when(s3SinkConfig.getBucketName()).thenReturn(UUID.randomUUID().toString());
Expand Down

0 comments on commit c31fa8f

Please sign in to comment.