Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding validation for required parameters #66

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class FileConfig {

public final boolean enableLargeEvent;

public final String pscId;


public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
boolean enableLargeEvent) {
boolean enableLargeEvent, String pscId) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
this.pscId = pscId;
}

@Override
Expand All @@ -69,6 +72,7 @@ public String toString() {
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
", pscId=" + pscId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private static final String PSC_ID = "PSC_ID";
private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;
Expand All @@ -75,7 +77,8 @@ public FileIngestService(DeviceDriverConfig config) {
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName(),
getLargeEventEnable());
getLargeEventEnable(),
getPscId());
LOG.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
LOG.info("Scope: {}", scopeName);
Expand All @@ -88,6 +91,10 @@ public FileIngestService(DeviceDriverConfig config) {
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

private String getPscId() {
return getProperty(PSC_ID);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static void main(String[] args) {
WatchDogConfig config = new WatchDogConfig(properties);
log.debug("Properties: {}", properties);
final WatchDogService service = new WatchDogService(config);
service.checkPscServiceStatus();
service.startAsync();
service.awaitTerminated();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.pravega.common.util.Property;

import java.io.File;
import java.text.MessageFormat;
import java.util.Map;

public class WatchDogConfig {
Expand All @@ -32,7 +33,7 @@ public class WatchDogConfig {
private final String serviceName;

public WatchDogConfig(Map<String, String> properties) {
this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue());
this.serviceName = getProperty(properties, PSC_SERVICE_NAME.toString());
this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue()));
this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue());
this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue());
Expand Down Expand Up @@ -79,4 +80,20 @@ public String getRestartTriggerPath() {
return restartTriggerPath;
}


/**
* Retrieves the value of a property from the given map of properties.
*
* @param properties the map of properties to retrieve the value from
* @param key the key of the property to retrieve
* @return the value of the property
*/
public String getProperty(Map<String, String> properties, String key) {
final String value = properties.get(key);
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(MessageFormat.format("Missing required parameter {0}", key));
}
return value;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@
package io.pravega.sensor.collector.watchdog;

import com.google.common.util.concurrent.AbstractService;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;


/**
* Watchdog service for Pravega Sensor Collector(PSC).
Expand Down Expand Up @@ -48,4 +53,28 @@ protected void doStop() {
notifyStopped();
}

public void checkPscServiceStatus() throws IOException {
Process psc = null;
log.info("Checking PSC service status {} ", this.config.getServiceName());
if (SystemUtils.IS_OS_LINUX) {
psc = Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl status " + this.config.getServiceName()});
} else if ( SystemUtils.IS_OS_WINDOWS) {
psc = Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", this.config.getServiceName() + ".exe", "status"});
}
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(psc.getInputStream()));

String s = null;
Boolean isAlive = true;
while ((s = stdInput.readLine()) != null) {
if(s.equalsIgnoreCase("NonExistent")){
isAlive = false;
}
}
log.debug("Process psc {}, and isAlive value is {} ", psc, isAlive);
if(!isAlive) {
log.error("PSC service is not running");
throw new RuntimeException("PSC service is not running. Please start psc service before starting watchdog service.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000l,"RawFileIngestService", true);
true,20.0, 5000l,"RawFileIngestService", true, "psc1");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);
Expand All @@ -61,7 +61,7 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"CsvFileIngestService", false);
true,20.0, 5000L,"CsvFileIngestService", false, "psc1");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);
Expand All @@ -76,7 +76,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000L,"ParquetFileIngestService", false);
true,20.0, 5000L,"ParquetFileIngestService", false, "psc1");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config,state,writer,transactionCoordinator,"writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void setup() {
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db","/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, true,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
}

@Test
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() {
public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() {
FileConfig newConfig = new FileConfig(null,"/opt/pravega-sensor-collector/Files/A","parquet","key12",
"stream1","{}",10, false,
true,20.0, 5000,"RawFileIngestService", true);
true,20.0, 5000,"RawFileIngestService", true, "pscId");
Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test"));
Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class WatchdogServiceTests {

@Test
public void testWatchDogServiceStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
WatchDogService service = new WatchDogService(config);
service.startAsync();
Assert.assertTrue(service.isRunning());
Expand All @@ -23,7 +23,7 @@ public void testWatchDogServiceStart() {

@Test
public void testWatchdogMonitorStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.startAsync();
Assert.assertTrue(monitor.isRunning());
Expand All @@ -33,7 +33,7 @@ public void testWatchdogMonitorStart() {

@Test
public void testWatchdogRestartPSC() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand All @@ -45,7 +45,7 @@ public void testWatchdogRestartPSC() {

@Test
public void testWatchdogResetCounterOnUpdates() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Loading