Skip to content

Commit

Permalink
Adding validation for psc id for running multiple instances
Browse files Browse the repository at this point in the history
  • Loading branch information
kuldeepk3 committed Mar 4, 2024
1 parent 35a1487 commit ea39c04
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class FileConfig {

public final boolean enableLargeEvent;

public final String pscId;


public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
boolean enableLargeEvent) {
boolean enableLargeEvent, String pscId) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
this.pscId = pscId;
}

@Override
Expand All @@ -69,6 +72,7 @@ public String toString() {
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
", pscId=" + pscId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private static final String PSC_ID = "PSC_ID";
private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;
Expand All @@ -75,7 +77,8 @@ public FileIngestService(DeviceDriverConfig config) {
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName(),
getLargeEventEnable());
getLargeEventEnable(),
getPscId());
LOG.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
LOG.info("Scope: {}", scopeName);
Expand All @@ -88,6 +91,10 @@ public FileIngestService(DeviceDriverConfig config) {
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

private String getPscId() {
return getProperty(PSC_ID);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000l,"RawFileIngestService", true);
true,20.0, 5000l,"RawFileIngestService", true, "psc1");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);
Expand All @@ -61,7 +61,7 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"CsvFileIngestService", false);
true,20.0, 5000L,"CsvFileIngestService", false, "psc1");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);
Expand All @@ -76,7 +76,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"ParquetFileIngestService", false);
true,20.0, 5000L,"ParquetFileIngestService", false, "psc1");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void setup() {
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, true,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
}

@Test
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() {
public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() {
FileConfig newConfig = new FileConfig(null,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test"));
Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test

0 comments on commit ea39c04

Please sign in to comment.