diff --git a/README.md b/README.md
index e1e6aba7..1370445a 100644
--- a/README.md
+++ b/README.md
@@ -145,30 +145,33 @@ For a list of commonly-used configuration values, see the
#### Sample configuration properties
-| Configuration Parameter | Value | Description |
-|-------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|
-| `CREATE_SCOPE` | `false` | Boolean value. |
-| `ROUTING_KEY` | `routingkey1` | Pravega routing key |
-| `ENABLE_PRAVEGA` | `true` | Boolean parameter. Default value = true |
-| `pravega_client_auth_method` | `Bearer` | Authentication type to connect to Pravega client |
-| `pravega_client_auth_loadDynamic` | `true` | Boolean parameter. Default value = true |
-| `KEYCLOAK_SERVICE_ACCOUNT_FILE` | `/opt/Pravega-sensor-collector/PSC_Files/keycloak-project1.json` | Path for keycloak service account file |
-| `PRAVEGA_SENSOR_COLLECTOR_ACCEL2_CLASS` | Raw File: `io.pravega.sensor.collector.file.rawfile.RawFileIngestService`
CSV file: `io.pravega.sensor.collector.file.csvfile.CsvFileIngestService`
Parquet file: `io.pravega.sensor.collector.file.parquet.ParquetFileIngestService` | Pravega sensor collector class package |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_SPEC` | `/opt/Pravega-sensor-collector/files1` | The application reads files for processing from a specified directory path |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_EXTENSION` | `parquet` | Types of file Example:-
Raw File: parquet
CSV file: csv
Parquet file: parquet | | |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE` | `/opt/Pravega-sensor-collector/PSC_Files/datafile.db` | Directory path where database file gets created Example: /opt/database/databasefile.db |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI` | `tls://pravega-controller.foggy-nelson.ns.sdp.hop.lab.emc.com:443` | Pravega controller URI EX: Pravega Controller URI |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE` | `scope1` | Scope name for Pravega sensor collector |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM` | `stream1` | Stream name for Pravega sensor collector |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY` | `routingkey1` | Routing key for Pravega Sensor collector |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES` | `false` | If true, PSC immediately delete the file soon after processing |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES` | `2.0` | Timeout for each transaction. Default value is 2 minutes |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE` | `false` | If Pravega is on SDP, set this to `false`. Accept Boolean value. |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_EXACTLY_ONCE` | true | If true, it will use transactional write. For raw file ingestion it is recommended to set it as false as in transactional write, client can process maximum file size of 8mb. |
-| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT` | false | if false, will not allow to write large event. It is recommended to set it as true for non transactional write. |
-| `HADOOP_HOME` | `${HOME}/dev` | For windows, Hadoop requires native libraries on Windows to work properly. You can download `Winutils.exe` to fix this.
See [here](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems). Add the location of bin/winutils.exe in the parameter HADOOP_HOME.
**This is required only for Parquet file type not for CSV and Raw file ingestion type** |
-
-
+| Configuration Parameter | Value | Description | Optional / Mandatory |
+|-----------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|:--------------------:|
+| `CREATE_SCOPE` | `false` | Boolean value. | Mandatory |
+| `ROUTING_KEY` | `routingkey1` | Pravega routing key | Optional |
+| `ENABLE_PRAVEGA` | `true` | Boolean parameter. Default value = true | Optional |
+| `pravega_client_auth_method` | `Bearer` | Authentication type to connect to Pravega client | Mandatory |
+| `pravega_client_auth_loadDynamic` | `true` | Boolean parameter. Default value = true | Optional |
+| `KEYCLOAK_SERVICE_ACCOUNT_FILE` | `/opt/Pravega-sensor-collector/PSC_Files/keycloak-project1.json` | Path for keycloak service account file | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_ACCEL2_CLASS` | Raw File: `io.pravega.sensor.collector.file.rawfile.RawFileIngestService`
CSV file: `io.pravega.sensor.collector.file.csvfile.CsvFileIngestService`
Parquet file: `io.pravega.sensor.collector.file.parquet.ParquetFileIngestService` | Pravega sensor collector class package | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_SPEC` | `/opt/Pravega-sensor-collector/files1` | The application reads files for processing from a specified directory path | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_EXTENSION` | `parquet` | Types of file Example:-
Raw File: parquet
CSV file: csv
Parquet file: parquet | Mandatory | |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE` | `/opt/Pravega-sensor-collector/PSC_Files/datafile.db` | Directory path where database file gets created Example: /opt/database/databasefile.db | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI` | `tls://pravega-controller.foggy-nelson.ns.sdp.hop.lab.emc.com:443` | Pravega controller URI EX: Pravega Controller URI | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE` | `scope1` | Scope name for Pravega sensor collector | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM` | `stream1` | Stream name for Pravega sensor collector | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY` | `routingkey1` | Routing key for Pravega Sensor collector | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES` | `false` | If true, PSC immediately delete the file soon after processing | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES` | `2.0` | Timeout for each transaction. Default value is 2 minutes | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE` | `false` | If Pravega is on SDP, set this to `false`. Accept Boolean value. | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_EXACTLY_ONCE` | true | If true, it will use transactional write. For raw file ingestion it is recommended to set it as false as in transactional write, client can process maximum file size of 8mb. | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT` | false | if false, will not allow to write large event. It is recommended to set it as true for non transactional write. | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_HADOOP_HOME` | `${HOME}/dev` | For windows, Hadoop requires native libraries on Windows to work properly. You can download `Winutils.exe` to fix this.
See [here](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems). Add the location of bin/winutils.exe in the parameter HADOOP_HOME.
**This is required only for Parquet file type not for CSV and Raw file ingestion type** | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID` | `pscId` | String value used to differentiate between different psc instances | Mandatory |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_EVENT_TEMPLATE_KEY` | `{}` | Template of file conten | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_SAMPLES_PER_EVENT_KEY` | `100` | Samples per event | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES_KEY` | `5.0` | Transaction timeout limit | Optional |
+| `PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY` | `5000` | Min time to update file in millis | Optional |
### Install the Service
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java
index b255142e..ac388615 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java
@@ -39,7 +39,7 @@ public DeviceDriver(DeviceDriverConfig config) {
public String getProperty(String key) {
final String value = config.getProperties().get(key);
if (value == null || value.isEmpty()) {
- throw new IllegalArgumentException(MessageFormat.format("Missing required parameter {0}", key));
+ throw new IllegalArgumentException(MessageFormat.format("PSC Service Error: Missing required parameter {0} in config", key));
}
return value;
}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java
index 8ca5fa3f..0bf92e0b 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileConfig.java
@@ -33,11 +33,13 @@ public class FileConfig {
public final boolean enableLargeEvent;
+ public final String pscId;
+
public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String routingKey,
String streamName, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles,
boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType,
- boolean enableLargeEvent) {
+ boolean enableLargeEvent, String pscId) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
@@ -51,6 +53,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
this.enableLargeEvent = enableLargeEvent;
+ this.pscId = pscId;
}
@Override
@@ -69,6 +72,7 @@ public String toString() {
", transactionTimeoutMinutes=" + transactionTimeoutMinutes +
", minTimeInMillisToUpdateFile=" + minTimeInMillisToUpdateFile +
", enableLargeEvent=" + enableLargeEvent +
+ ", pscId=" + pscId +
'}';
}
}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java
index 4ea95fdb..3cc95456 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java
@@ -52,6 +52,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;
private static final int DEFAULT_INTERVAL_MS_KEY = 10000;
+ private static final String PSC_ID = "PSC_ID";
private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;
@@ -75,7 +76,8 @@ public FileIngestService(DeviceDriverConfig config) {
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName(),
- getLargeEventEnable());
+ getLargeEventEnable(),
+ getPscId());
LOG.info("File Ingest Config: {}", fileSequenceConfig);
final String scopeName = getScopeName();
LOG.info("Scope: {}", scopeName);
@@ -88,12 +90,16 @@ public FileIngestService(DeviceDriverConfig config) {
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}
+ private String getPscId() {
+ return getProperty(PSC_ID);
+ }
+
String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}
String getFileExtension() {
- return getProperty(FILE_EXT, "");
+ return getProperty(FILE_EXT);
}
boolean getDeleteCompletedFiles() {
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java
index 470dd391..1ef3ac50 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java
@@ -38,6 +38,7 @@ public static Metric getMetric(String metricName) {
* json string.
*/
public static String getMetricsAsJson() throws JsonProcessingException {
+
return MAPPER.writerWithDefaultPrettyPrinter()
.writeValueAsString(metricStore);
}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java
index 83484981..a21dd550 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java
@@ -52,6 +52,7 @@ private EventStreamWriter initializeWriter() {
log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString());
ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build();
try (StreamManager streamManager = StreamManager.create(clientConfig)) {
+
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java
index ce229cad..dbe98db2 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java
@@ -28,6 +28,7 @@ public static void main(String[] args) {
WatchDogConfig config = new WatchDogConfig(properties);
log.debug("Properties: {}", properties);
final WatchDogService service = new WatchDogService(config);
+ service.checkPscServiceStatus();
service.startAsync();
service.awaitTerminated();
} catch (Exception e) {
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java
index 213b480b..32acece7 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java
@@ -12,6 +12,7 @@
import io.pravega.common.util.Property;
import java.io.File;
+import java.text.MessageFormat;
import java.util.Map;
public class WatchDogConfig {
@@ -32,7 +33,7 @@ public class WatchDogConfig {
private final String serviceName;
public WatchDogConfig(Map properties) {
- this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue());
+ this.serviceName = getProperty(properties, PSC_SERVICE_NAME.toString());
this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue()));
this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue());
this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue());
@@ -79,4 +80,19 @@ public String getRestartTriggerPath() {
return restartTriggerPath;
}
+ /**
+ * Retrieves the value of a property from the given map of properties.
+ *
+ * @param properties the map of properties to retrieve the value from
+ * @param key the key of the property to retrieve
+ * @return the value of the property
+ * @throws IllegalArgumentException throw IllegalArgumentException if there is missing required parameter.
+ */
+ public String getProperty(Map properties, String key) {
+ final String value = properties.get(key);
+ if (value == null || value.isEmpty()) {
+ throw new IllegalArgumentException(MessageFormat.format("PSC Watchdog Service: Missing required parameter {0} in config", key));
+ }
+ return value;
+ }
}
diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java
index 066b61fe..c0a081ea 100644
--- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java
+++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java
@@ -10,9 +10,13 @@
package io.pravega.sensor.collector.watchdog;
import com.google.common.util.concurrent.AbstractService;
+import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
/**
* Watchdog service for Pravega Sensor Collector(PSC).
@@ -48,4 +52,28 @@ protected void doStop() {
notifyStopped();
}
+ public void checkPscServiceStatus() throws IOException {
+ Process psc = null;
+ log.info("Checking PSC service status {} ", this.config.getServiceName());
+ if (SystemUtils.IS_OS_LINUX) {
+ psc = Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl status " + this.config.getServiceName()});
+ } else if ( SystemUtils.IS_OS_WINDOWS) {
+ psc = Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", this.config.getServiceName() + ".exe", "status"});
+ }
+ BufferedReader stdInput = new BufferedReader(new
+ InputStreamReader(psc.getInputStream()));
+
+ String s;
+ Boolean isAlive = true;
+ while ((s = stdInput.readLine()) != null) {
+ if (s.equalsIgnoreCase("NonExistent")) {
+ isAlive = false;
+ }
+ }
+ log.debug("Process psc {}, and isAlive value is {} ", psc, isAlive);
+ if (!isAlive) {
+ log.error("PSC service is not running");
+ throw new RuntimeException("PSC service is not running. Please start psc service before starting watchdog service.");
+ }
+ }
}
diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java
index 3151e5d8..ec079b3d 100644
--- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java
+++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java
@@ -13,10 +13,14 @@
import io.pravega.sensor.collector.DeviceDriverManager;
import io.pravega.sensor.collector.Parameters;
import io.pravega.sensor.collector.util.TestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Map;
@@ -36,13 +40,24 @@ public class FileIngestServiceTest {
private DeviceDriverManager driverManager;
@BeforeEach
- void setUp() {
+ void setUp() throws IOException {
properties = Parameters.getProperties(FILE_NAME);
+ Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
driverManager = new DeviceDriverManager(properties);
deviceDriverConfig = new DeviceDriverConfig("RAW1", "RawFileIngestService",
TestUtils.configFromProperties(PREFIX, SEPARATOR, properties), driverManager);
}
+ @AfterEach
+ public void tearDown() {
+ try {
+ Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ properties = null;
+ }
+
@Test
public void testFileIngestService() {
FileIngestService fileIngestService = new MockFileIngestService(deviceDriverConfig);
diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java
index 05b0f838..31591573 100644
--- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java
+++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorFactoryTest.java
@@ -46,11 +46,9 @@ public void createRAWFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
- true, 20.0, 5000L, "RawFileIngestService", true);
+ true, 20.0, 5000L, "RawFileIngestService", true, "psc1");
FileProcessor rawFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");
-
Assertions.assertTrue(rawFileProcessor instanceof RawFileProcessor);
-
}
/*
@@ -61,9 +59,8 @@ public void createCSVFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
- true, 20.0, 5000L, "CsvFileIngestService", false);
+ true, 20.0, 5000L, "CsvFileIngestService", false, "psc1");
FileProcessor csvFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");
-
Assertions.assertTrue(csvFileProcessor instanceof CsvFileSequenceProcessor);
}
@@ -76,7 +73,7 @@ public void createParquetFileProcessorTest() throws Exception {
String stateDatabaseFileName = ":memory:";
config = new FileConfig(stateDatabaseFileName, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
- true, 20.0, 5000L, "ParquetFileIngestService", false);
+ true, 20.0, 5000L, "ParquetFileIngestService", false, "psc1");
FileProcessor parquetFileProcessor = FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator, "writerId");
Assertions.assertTrue(parquetFileProcessor instanceof ParquetFileProcessor);
diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java
index 66aa9716..042fc120 100644
--- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java
+++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java
@@ -81,7 +81,7 @@ protected void setup() {
String stateDatabaseFileName = ":memory:";
config = new FileConfig("./psc.db", "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, true,
- true, 20.0, 5000, "RawFileIngestService", true);
+ true, 20.0, 5000, "RawFileIngestService", true, "pscId");
}
@Test
@@ -225,7 +225,7 @@ public void testCreateRawFileProcessorWithNullTransactionCordinator() {
public void testCreateRawFileProcessorWithNullStateDatabaseFilenameInConfig() {
FileConfig newConfig = new FileConfig(null, "/opt/pravega-sensor-collector/Files/A", "parquet", "key12",
"stream1", "{}", 10, false,
- true, 20.0, 5000, "RawFileIngestService", true);
+ true, 20.0, 5000, "RawFileIngestService", true, "pscId");
Exception exception = Assert.assertThrows(NullPointerException.class, () -> new RawFileProcessor(newConfig, state, transactionalEventWriter, transactionCoordinator, "test"));
Assert.assertTrue("config.stateDatabaseFileName".equals(exception.getMessage()));
}
diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java
index 5d66480c..31ab9fe3 100644
--- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java
+++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java
@@ -22,7 +22,7 @@ public class WatchdogServiceTests {
@Test
public void testWatchDogServiceStart() {
- WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build());
+ WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build());
WatchDogService service = new WatchDogService(config);
service.startAsync();
Assert.assertTrue(service.isRunning());
@@ -32,7 +32,7 @@ public void testWatchDogServiceStart() {
@Test
public void testWatchdogMonitorStart() {
- WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build());
+ WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.startAsync();
Assert.assertTrue(monitor.isRunning());
@@ -42,7 +42,7 @@ public void testWatchdogMonitorStart() {
@Test
public void testWatchdogRestartPSC() {
- WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build());
+ WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
@@ -54,7 +54,7 @@ public void testWatchdogRestartPSC() {
@Test
public void testWatchdogResetCounterOnUpdates() {
- WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build());
+ WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().put("PSC_SERVICE_NAME", "test").build());
PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config);
monitor.onUpdateMissed();
monitor.onUpdateMissed();
diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties
index 7782b040..6578a9a0 100644
--- a/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties
+++ b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties
@@ -22,3 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
+PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test
diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties b/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties
index 0237ea93..c77417d8 100644
--- a/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties
+++ b/pravega-sensor-collector/src/test/resources/RawFileIngestService.properties
@@ -24,3 +24,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=1.0
PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true
PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000
PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15
+PRAVEGA_SENSOR_COLLECTOR_RAW1_PSC_ID=test