Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watchdogv1 #72

Draft
wants to merge 24 commits into
base: stability-improvements-v2
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
549009f
Watchdog draft v1
abhinb Jan 22, 2024
6bee5cf
service notf
abhinb Jan 22, 2024
bb6ea6a
Changing gradle task name
abhinb Jan 22, 2024
64a9e44
Adding tests
abhinb Jan 29, 2024
65b7747
Merge branch 'stability-improvements-v2' of https://github.com/praveg…
abhinb Feb 6, 2024
10efe36
Merge branch 'stability-improvements-v2' of https://github.com/praveg…
abhinb Feb 8, 2024
14661c7
Support to push metric data to Pravega Stream
abhinb Feb 9, 2024
5442a3b
Some more changes
abhinb Feb 16, 2024
587eae6
Correcting tests
abhinb Feb 23, 2024
6fc0945
Merge branch 'stability-improvements-v2' of https://github.com/praveg…
abhinb Feb 27, 2024
36b077c
Handling comments
abhinb Feb 27, 2024
d57b809
Stopping MetricPublisher
abhinb Feb 27, 2024
6845d7d
Not starting MetricStreamWriter
abhinb Feb 28, 2024
0c7a6fc
Fixing unit test case (#64)
kuldeepk3 Feb 29, 2024
410358e
Fixing keycloak credentials issue (#65)
kuldeepk3 Feb 29, 2024
a591072
Merge branch 'watchdogv1' of https://github.com/pravega/pravega-senso…
abhinb Feb 29, 2024
5cb429d
Fixing Authorization failure
abhinb Feb 29, 2024
dea286f
Handling comment
abhinb Feb 29, 2024
35a1487
Fixing stopAsync issues
abhinb Feb 29, 2024
af79218
Adding validation for required parameters (#66)
kuldeepk3 Mar 6, 2024
cf0cc0a
Updating READMe file (#68)
kuldeepk3 Mar 6, 2024
c802447
Making file extension mandatory (#69)
kuldeepk3 Mar 6, 2024
4dd930d
Merge branch 'stability-improvements-v2' into watchdogv1
kuldeepk3 Mar 7, 2024
479b8b1
Fixing stylecheck errors
kuldeepk3 Mar 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public DeviceDriver(DeviceDriverConfig config) {
public String getProperty(String key) {
final String value = config.getProperties().get(key);
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(MessageFormat.format("Missing required parameter {0}", key));
throw new IllegalArgumentException(MessageFormat.format("PSC Service Error: Missing required parameter {0} in config", key));
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ public class FileConfig {

public final boolean enableLargeEvent;

public final String pscId;


public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
boolean enableLargeEvent) {
boolean enableLargeEvent, String pscId) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
Expand All @@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
this.pscId = pscId;
}

@Override
Expand All @@ -69,6 +72,7 @@ public String toString() {
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
", pscId=" + pscId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;
private static final String PSC_ID = "PSC_ID";
private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;
Expand All @@ -75,7 +76,8 @@ public FileIngestService(DeviceDriverConfig config) {
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName(),
getLargeEventEnable());
getLargeEventEnable(),
getPscId());
LOG.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
LOG.info("Scope: {}", scopeName);
Expand All @@ -88,12 +90,16 @@ public FileIngestService(DeviceDriverConfig config) {
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

private String getPscId() {
return getProperty(PSC_ID);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}

String getFileExtension() {
return getProperty(FILE_EXT, "");
return getProperty(FILE_EXT);
}

boolean getDeleteCompletedFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public static Metric getMetric(String metricName) {
* json string.
*/
public static String getMetricsAsJson() throws JsonProcessingException {

return MAPPER.writerWithDefaultPrettyPrinter()
.writeValueAsString(metricStore);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private EventStreamWriter<String> initializeWriter() {
log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString());
ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build();
try (StreamManager streamManager = StreamManager.create(clientConfig)) {

StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public static void main(String[] args) {
WatchDogConfig config = new WatchDogConfig(properties);
log.debug("Properties: {}", properties);
final WatchDogService service = new WatchDogService(config);
service.checkPscServiceStatus();
service.startAsync();
service.awaitTerminated();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.pravega.common.util.Property;

import java.io.File;
import java.text.MessageFormat;
import java.util.Map;

public class WatchDogConfig {
Expand All @@ -32,7 +33,7 @@ public class WatchDogConfig {
private final String serviceName;

public WatchDogConfig(Map<String, String> properties) {
this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue());
this.serviceName = getProperty(properties, PSC_SERVICE_NAME.toString());
this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue()));
this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue());
this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue());
Expand Down Expand Up @@ -79,4 +80,19 @@ public String getRestartTriggerPath() {
return restartTriggerPath;
}

/**
* Retrieves the value of a property from the given map of properties.
*
* @param properties the map of properties to retrieve the value from
* @param key the key of the property to retrieve
* @return the value of the property
* @throws IllegalArgumentException throw IllegalArgumentException if there is missing required parameter.
*/
public String getProperty(Map<String, String> properties, String key) {
final String value = properties.get(key);
if (value == null || value.isEmpty()) {
throw new IllegalArgumentException(MessageFormat.format("PSC Watchdog Service: Missing required parameter {0} in config", key));
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@
package io.pravega.sensor.collector.watchdog;

import com.google.common.util.concurrent.AbstractService;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
* Watchdog service for Pravega Sensor Collector(PSC).
Expand Down Expand Up @@ -48,4 +52,28 @@ protected void doStop() {
notifyStopped();
}

public void checkPscServiceStatus() throws IOException {
Process psc = null;
log.info("Checking PSC service status {} ", this.config.getServiceName());
if (SystemUtils.IS_OS_LINUX) {
psc = Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl status " + this.config.getServiceName()});
} else if ( SystemUtils.IS_OS_WINDOWS) {
psc = Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", this.config.getServiceName() + ".exe", "status"});
}
BufferedReader stdInput = new BufferedReader(new
InputStreamReader(psc.getInputStream()));

String s;
Boolean isAlive = true;
while ((s = stdInput.readLine()) != null) {
if (s.equalsIgnoreCase("NonExistent")) {
isAlive = false;
}
}
log.debug("Process psc {}, and isAlive value is {} ", psc, isAlive);
if (!isAlive) {
log.error("PSC service is not running");
throw new RuntimeException("PSC service is not running. Please start psc service before starting watchdog service.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
import io.pravega.sensor.collector.DeviceDriverManager;
import io.pravega.sensor.collector.Parameters;
import io.pravega.sensor.collector.util.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;

Expand All @@ -36,13 +40,24 @@ public class FileIngestServiceTest {
private DeviceDriverManager driverManager;

@BeforeEach
void setUp() {
void setUp() throws IOException {
properties = Parameters.getProperties(FILE_NAME);
Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
driverManager = new DeviceDriverManager(properties);
deviceDriverConfig = new DeviceDriverConfig("RAW1", "RawFileIngestService",
TestUtils.configFromProperties(PREFIX, SEPARATOR, properties), driverManager);
}

@AfterEach
public void tearDown() {
try {
Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
} catch (IOException e) {
throw new RuntimeException(e);
}
properties = null;
}

@Test
public void testFileIngestService() {
FileIngestService fileIngestService = new MockFileIngestService(deviceDriverConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
true, 20.0, 5000L, "RawFileIngestService", true);
true, 20.0, 5000L, "RawFileIngestService", true, "psc1");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");

Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);

}

/*
Expand All @@ -61,9 +59,8 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
true, 20.0, 5000L, "CsvFileIngestService", false);
true, 20.0, 5000L, "CsvFileIngestService", false, "psc1");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");

Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);

}
Expand All @@ -76,7 +73,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
true, 20.0, 5000L, "ParquetFileIngestService", false);
true, 20.0, 5000L, "ParquetFileIngestService", false, "psc1");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");

Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void setup() {
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db", "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, true,
true, 20.0, 5000, "RawFileIngestService", true);
true, 20.0, 5000, "RawFileIngestService", true, "pscId");
}

@Test
Expand Down Expand Up @@ -225,7 +225,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() {
public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() {
FileConfig newConfig = new FileConfig(null, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
true, 20.0, 5000, "RawFileIngestService", true);
true, 20.0, 5000, "RawFileIngestService", true, "pscId");
Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test"));
Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class WatchdogServiceTests {

@Test
public void testWatchDogServiceStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
WatchDogService service = new WatchDogService(config);
service.startAsync();
Assert.assertTrue(service.isRunning());
Expand All @@ -32,7 +32,7 @@ public void testWatchDogServiceStart() {

@Test
public void testWatchdogMonitorStart() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.startAsync();
Assert.assertTrue(monitor.isRunning());
Expand All @@ -42,7 +42,7 @@ public void testWatchdogMonitorStart() {

@Test
public void testWatchdogRestartPSC() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand All @@ -54,7 +54,7 @@ public void testWatchdogRestartPSC() {

@Test
public void testWatchdogResetCounterOnUpdates() {
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().build());
WatchDogConfig config = new WatchDogConfig(ImmutableMap.<String, String>builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
Loading