Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Nov 21, 2023
1 parent ae62cc5 commit 5da65ce
Show file tree
Hide file tree
Showing 7 changed files with 574 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void processNewFiles() throws Exception {
// If nextFile is null then check for new files to process is handled as part of scheduleWithDelay
final Pair<FileNameWithOffset, Long> nextFile = state.getNextPendingFileRecord();
if (nextFile == null) {
log.info("processNewFiles: No more files to watch");
log.debug("processNewFiles: No more files to watch");
break;
} else {
processFile(nextFile.getLeft(), nextFile.getRight());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ public void performRecovery() {
if (transactionsToCommit.isEmpty()) {
log.info("performRecovery: No transactions to be recovered");
} else {
log.warn("Transaction recovery needed on {} transactions", transactionsToCommit.size());
log.warn("performRecovery: Transaction recovery needed on {} transactions", transactionsToCommit.size());
transactionsToCommit.forEach((txnId) -> {
log.info("Committing transaction {} from a previous process", txnId);
log.info("performRecovery: Committing transaction {} from a previous process", txnId);
try {
writer.commit(txnId);
} catch (TxnFailedException e) {
Expand All @@ -167,6 +167,7 @@ public void performRecovery() {
throw e;
}
}
log.info("performRecovery: deleting committed transaction {} from TransactionsToCommit table",txnId);
deleteTransactionToCommit(Optional.of(txnId));
});
log.info("Transaction recovery completed");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.pravega.sensor.collector.file;

import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.sql.SQLException;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

public class FileIngestServiceTest {

@Mock
private FileProcessor processor;

@BeforeEach
public void setUp() throws SQLException {
MockitoAnnotations.initMocks(this);

}

@Test
public void watchFileTests() throws Exception {
doNothing().when(processor).watchFiles();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.pravega.sensor.collector.file;

import io.pravega.sensor.collector.file.csvfile.CsvFileSequenceProcessor;
import io.pravega.sensor.collector.file.parquet.ParquetFileProcessor;
import io.pravega.sensor.collector.file.rawfile.RawFileProcessor;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import io.pravega.sensor.collector.util.TransactionStateInMemoryImpl;
import io.pravega.sensor.collector.util.TransactionStateSQLiteImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.sql.SQLException;

import static org.mockito.Mockito.doNothing;

public class FileProcessorFactoryTest {


private FileConfig config;
@Mock
private EventWriter writer;
@Mock
private TransactionCoordinator transactionCoordinator;
@Mock
private TransactionStateInMemoryImpl state;

@BeforeEach
public void setUp() {
MockitoAnnotations.initMocks(this);

}

/*
* Test for creating Raw file processor
*/
@Test
public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0,"RawFileIngestService");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);

}

/*
* Test for creating CSV file processor
*/
@Test
public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0,"CsvFileIngestService");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);

}

/*
* Test for creating PARQUET file processor
*/
@Test
public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0,"ParquetFileIngestService");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,47 @@
import com.google.common.collect.ImmutableList;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.client.stream.impl.TransactionalEventStreamWriterImpl;
import io.pravega.sensor.collector.file.rawfile.RawEventGenerator;
import io.pravega.sensor.collector.file.rawfile.RawFileProcessor;
import io.pravega.sensor.collector.util.*;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
public class FileProcessorTests {
private static final Logger log = LoggerFactory.getLogger(FileProcessorTests.class);

private FileConfig config;
@Mock
private TransactionStateSQLiteImpl state;

@Mock
private EventWriter writer;

@Mock
TransactionalEventWriter transactionalEventWriter;
@Mock
private TransactionCoordinator transactionCoordinator;
@Mock
Expand All @@ -41,21 +55,12 @@ public class FileProcessorTests {
@BeforeEach
public void setup(){
MockitoAnnotations.initMocks(this);
config = new FileConfig("tset.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0,"RawFileIngestService");
/*writer = EventWriter.create(
clientFactory,
"writerId",
config.streamName,
new ByteArraySerializer(),
EventWriterConfig.builder()
.enableConnectionPooling(true)
.transactionTimeoutTime((long) (20.0 * 60.0 * 1000.0))
.build(),
config.exactlyOnce);*/
String stateDatabaseFileName = ":memory:";
state = TransactionStateInMemoryImpl.create(stateDatabaseFileName);

// state = TransactionStateInMemoryImpl.create(stateDatabaseFileName);
//rawFileProcessor = new RawFileProcessor(config,state, writer, transactionCoordinator, "writerId");

}
Expand Down Expand Up @@ -90,4 +95,67 @@ public void getEmptyNextFileSet() throws Exception {
FileProcessor fileProcessor = FileProcessor.create(config, clientFactory);
fileProcessor.processFiles();
}


/*
* Process the single file for Raw file processor.
*/
@Test
public void processNextFile() throws Exception {
// Mockito.when(state.getNextPendingFileRecord()).thenReturn(new ImmutablePair<>(new FileNameWithOffset("file1.parquet", 0), 1L));
FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test");
doNothing().when(transactionalEventWriter).writeEvent(anyString(), any());
//fileProcessor.processNewFiles();
fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L);
verify(transactionalEventWriter).writeEvent(anyString(), any());
}

/*
* Process 3 files in loop
*/
@Test
public void processNextFewFiles() throws Exception {
// Define different return values for the first three invocations and from 4th invocation onwards null
Mockito.when(state.getNextPendingFileRecord())
.thenReturn(new ImmutablePair<>(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L))
.thenReturn(new ImmutablePair<>(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub2.parquet", 0), 2L))
.thenReturn(new ImmutablePair<>(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub3.parquet", 0), 3L))
.thenAnswer(invocation -> null);

FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test");
doNothing().when(transactionalEventWriter).writeEvent(anyString(), any());
fileProcessor.processNewFiles();

// Verify that myMethod was called exactly three times
Mockito.verify(transactionalEventWriter, Mockito.times(3)).writeEvent(anyString(), any());

}

/*
* Process the single file .
* Throw transaction failed exception while writing events
*/
@Test
public void processNextFile_WriteEventException() throws Exception {
FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test");
Mockito.doThrow(TxnFailedException.class).when(transactionalEventWriter).writeEvent(anyString(), any());
assertThrows(RuntimeException.class, () -> fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L));
// Verify that myMethod was called exactly three times
Mockito.verify(transactionalEventWriter, Mockito.times(1)).writeEvent(anyString(), any());

}
/*
* Process the single file .
* Throw transaction failed exception while commiting transaction
*/
@Test
public void processNextFile_CommitException() throws Exception {
FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test");
Mockito.doThrow(TxnFailedException.class).when(transactionalEventWriter).commit();
assertThrows(RuntimeException.class, () -> fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L));
// Verify that myMethod was called exactly three times
Mockito.verify(transactionalEventWriter, Mockito.times(1)).commit();

}

}
Loading

0 comments on commit 5da65ce

Please sign in to comment.