From 549009f79d5d21133e6d8f347a7a40a11d74b305 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Mon, 22 Jan 2024 09:01:50 -0800 Subject: [PATCH 01/15] Watchdog draft v1 Signed-off-by: Abhin Balur --- pravega-sensor-collector/build.gradle | 14 ++ .../src/main/dist/bin/install-service.sh | 7 + .../src/main/dist/bin/uninstall-service.sh | 3 +- .../dist/etc/psc-watchdog-TEMPLATE.service | 24 +++ .../collector/watchdog/PscWatchdogApp.java | 37 ++++ .../collector/watchdog/WatchDogConfig.java | 82 +++++++++ .../collector/watchdog/WatchDogService.java | 173 ++++++++++++++++++ windows-service/PscWatchdogApp.xml | 21 +++ 8 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 pravega-sensor-collector/src/main/dist/etc/psc-watchdog-TEMPLATE.service create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java create mode 100644 windows-service/PscWatchdogApp.xml diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index 3e3e354d..994d8bd7 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -116,6 +116,20 @@ tasks.register('runLeapAPIMockServer', JavaExec) { main = "io.pravega.sensor.collector.leap.LeapAPIMock" } +task createExtraRunApp(type: CreateStartScripts) { + mainClass = "io.pravega.sensor.collector.watchdog.PscWatchdogApp" + classpath = startScripts.classpath + outputDir = startScripts.outputDir + applicationName = 'psc-watchdog' +// defaultJvmOpts = ["-Xms1024m", "-Xmx2048m"] +} + +applicationDistribution.into("bin") { + duplicatesStrategy= DuplicatesStrategy.EXCLUDE + from(createExtraRunApp) + fileMode = 0755 +} + tasks.withType(com.github.spotbugs.snom.SpotBugsTask) { reports { xml { diff --git a/pravega-sensor-collector/src/main/dist/bin/install-service.sh b/pravega-sensor-collector/src/main/dist/bin/install-service.sh index 353d0dcd..dfa66e64 100755 --- a/pravega-sensor-collector/src/main/dist/bin/install-service.sh +++ b/pravega-sensor-collector/src/main/dist/bin/install-service.sh @@ -8,14 +8,21 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # +#TODO: split this into 2 functions, start-watchdog and start-psc set -x ROOT_DIR=$(readlink -f $(dirname $0)/..) SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})} echo Installing service ${SERVICE_NAME} located in ${ROOT_DIR}. sed -e "s:\${ROOT_DIR}:${ROOT_DIR}:g" ${ROOT_DIR}/etc/pravega-sensor-collector-TEMPLATE.service \ > ${ROOT_DIR}/etc/${SERVICE_NAME}.service +sed -e "s:\${ROOT_DIR}:${ROOT_DIR}:g" ${ROOT_DIR}/etc/psc-watchdog-TEMPLATE.service \ + > ${ROOT_DIR}/etc/psc-watchdog.service systemctl stop ${SERVICE_NAME}.service +systemctl stop psc-watchdog.service ln -svf ${ROOT_DIR}/etc/${SERVICE_NAME}.service /etc/systemd/system/${SERVICE_NAME}.service +ln -svf ${ROOT_DIR}/etc/psc-watchdog.service /etc/systemd/system/psc-watchdog.service systemctl daemon-reload systemctl enable ${SERVICE_NAME}.service +systemctl enable psc-watchdog.service systemctl start ${SERVICE_NAME}.service +systemctl start psc-watchdog.service diff --git a/pravega-sensor-collector/src/main/dist/bin/uninstall-service.sh b/pravega-sensor-collector/src/main/dist/bin/uninstall-service.sh index 72aac04b..b15e2dee 100755 --- a/pravega-sensor-collector/src/main/dist/bin/uninstall-service.sh +++ b/pravega-sensor-collector/src/main/dist/bin/uninstall-service.sh @@ -12,5 +12,6 @@ set -x ROOT_DIR=$(readlink -f $(dirname $0)/..) SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})} systemctl stop ${SERVICE_NAME}.service -systemctl disable ${SERVICE_NAME}.service +systemctl stop psc-watchdog.service +systemctl disable psc-watchdog.service systemctl daemon-reload diff --git a/pravega-sensor-collector/src/main/dist/etc/psc-watchdog-TEMPLATE.service b/pravega-sensor-collector/src/main/dist/etc/psc-watchdog-TEMPLATE.service new file mode 100644 index 00000000..20d4fe86 --- /dev/null +++ b/pravega-sensor-collector/src/main/dist/etc/psc-watchdog-TEMPLATE.service @@ -0,0 +1,24 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +[Unit] +Description=Watchdog for Pravega Sensor Collector +After=network.target + +[Service] +ExecStart=${ROOT_DIR}/bin/psc-watchdog +WorkingDirectory=${ROOT_DIR} +RestartSec=15s +Restart=on-failure +Type=simple +Environment="CONF_FILE=${ROOT_DIR}/conf/env.sh" +Environment="PSC_WATCHDOG_OPTS=-XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp" + +[Install] +WantedBy=multi-user.target diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java new file mode 100644 index 00000000..cee2422d --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector.watchdog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + Main App to start the Watchdog service + for Pravega Sensor Collector(PSC). + */ +public class PscWatchdogApp { + private static final Logger log = LoggerFactory.getLogger(PscWatchdogApp.class); + + public static void main(String[] args) { + try { + log.info("Watchdog starting"); + final Map properties = System.getenv(); + log.debug("Properties: {}", properties); + final WatchDogService service = new WatchDogService(properties); + service.startAsync(); + service.awaitTerminated(); + } catch (Exception e) { + log.error("Error starting Watchdog ", e); + System.exit(1); + } + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java new file mode 100644 index 00000000..213b480b --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogConfig.java @@ -0,0 +1,82 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector.watchdog; + +import io.pravega.common.util.Property; + +import java.io.File; +import java.util.Map; + +public class WatchDogConfig { + + public static final Property PSC_WATCHDOG_WATCH_INTERVAL_SECONDS = Property.named("WATCHDOG_WATCH_INTERVAL_SECONDS", "15", ""); + public static final Property PSC_WATCHDOG_FILE_MONITOR_PATH = Property.named("WATCHDOG_FILE_MONITOR_PATH", System.getProperty("java.io.tmpdir") + File.separator + "psc_metric.json", ""); + + public static final Property PSC_WATCHDOG_FILE_MONITOR_UPDATE_MISSED_THRESHOLD = Property.named("WATCHDOG_FILE_MONITOR_UPDATE_MISSED_THRESHOLD", "3", ""); + + public static final Property PSC_WATCHDOG_RESTART_TRIGGER_PATH = Property.named("WATCHDOG_RESTART_TRIGGER_PATH", ".", ""); + + public static final Property PSC_SERVICE_NAME = Property.named("PSC_SERVICE_NAME", "pravega-sensor-collector.service", ""); + + private final int watchDogWatchIntervalSeconds; + private final String watchdogFileMonitorPath; + private final int watchDogFileUpdateMissedThreshold; + private final String restartTriggerPath; + private final String serviceName; + + public WatchDogConfig(Map properties) { + this.serviceName = properties.getOrDefault(PSC_SERVICE_NAME.toString(), PSC_SERVICE_NAME.getDefaultValue()); + this.watchDogWatchIntervalSeconds = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.toString(), PSC_WATCHDOG_WATCH_INTERVAL_SECONDS.getDefaultValue())); + this.watchdogFileMonitorPath = properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_PATH.toString(), PSC_WATCHDOG_FILE_MONITOR_PATH.getDefaultValue()); + this.restartTriggerPath = properties.getOrDefault(PSC_WATCHDOG_RESTART_TRIGGER_PATH.toString(), PSC_WATCHDOG_RESTART_TRIGGER_PATH.getDefaultValue()); + this.watchDogFileUpdateMissedThreshold = Integer.parseInt(properties.getOrDefault(PSC_WATCHDOG_FILE_MONITOR_UPDATE_MISSED_THRESHOLD.toString(), PSC_WATCHDOG_FILE_MONITOR_UPDATE_MISSED_THRESHOLD.getDefaultValue())); + } + + /** + * PSC service name, Watchdog can + * use to trigger restart. + */ + public String getServiceName() { + return serviceName; + } + + /** + * The file path Watchdog will monitor to + * determine PSC liveness. + */ + public String getWatchdogFileMonitorPath() { + return watchdogFileMonitorPath; + } + + /** + * Interval at which Watchdog has to monitor + * the configured monitor file path. + */ + public int getWatchDogWatchInterval() { + return watchDogWatchIntervalSeconds; + } + + /** + * Threshold to determine the number of updates + * missed on the configured file path, beyond + * which Watchdog will trigger restart of PSC. + */ + public int getWatchDogFileUpdateMissedThreshold() { + return watchDogFileUpdateMissedThreshold; + } + + /** + * Not used. Will be removed. + */ + public String getRestartTriggerPath() { + return restartTriggerPath; + } + +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java new file mode 100644 index 00000000..bfaab832 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java @@ -0,0 +1,173 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector.watchdog; + +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + +/** + * Watchdog service for Pravega Sensor Collector(PSC). + * Watchdog service looks for liveness of Pravega Sensor Collector and attempts + * to keep it up and running if not live. + * The definition(can be extended with time) of liveness is watchdog looking + * for regular/live updates on a configured file made by PSC. + * If not live Watchdog restarts PSC service as a corrective measure. + * There could be other corrective measures added in future. + */ +public class WatchDogService extends AbstractService { + + private static final Logger log = LoggerFactory.getLogger(WatchDogService.class); + private final WatchDogConfig config; + private final Monitor monitor; // one monitor for now + + WatchDogService(Map properties) { + config = new WatchDogConfig(properties); + monitor = new PSCWatchdogMonitor(); + } + + @Override + protected void doStart() { + log.info("Starting WatchDog Service"); + monitor.startAsync(); + monitor.awaitTerminated(); + } + + @Override + protected void doStop() { + log.info("Stopping Watchdog Service"); + } + + /** + * Monitoring service to be started by watchdog. + * Implementations need to define actions for updates received + * or missed while monitoring. + */ + private interface Monitor extends Service { + + /** + * Define action on update to resource + * being monitored. + * @param modifiedTime + */ + void onUpdate(Instant modifiedTime); + + /** + * Define action on updates missed on + * resource being monitored. + * @throws Exception throw Exception if any when missing updates. + */ + void onUpdateMissed() throws Exception; + } + + private class PSCWatchdogMonitor extends AbstractService implements Monitor { + + private final Logger log = LoggerFactory.getLogger(PSCWatchdogMonitor.class); + private Instant lastModified = Instant.EPOCH; + private int updateMissedCount = 0; + private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( + PSCWatchdogMonitor.class.getSimpleName() + "-%d").build(); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, namedThreadFactory); + + @Override + public void onUpdate(Instant modifiedTime) { + updateMissedCount = 0; + this.lastModified = modifiedTime; + log.info("PSC Watchdog Monitor file has been updated at {}", config.getWatchdogFileMonitorPath()); + } + + @Override + public void onUpdateMissed() { + log.info("PSC Watchdog Monitor File has not been updated at {}", config.getWatchdogFileMonitorPath()); + updateMissedCount++; + log.debug("Update missed count {} and threshold is {}", updateMissedCount, config.getWatchDogFileUpdateMissedThreshold()); + // if no. of updates missed is greater that set threshold then take action. + if (updateMissedCount > config.getWatchDogFileUpdateMissedThreshold()) { + log.debug("Triggering restart of PSC."); + try { + restartPSC(); + } catch (IOException ioe) { + log.error("Error restarting PSC. Exception: {}", ioe); + } + updateMissedCount = 0; + } + } + + @Override + protected void doStart() { + log.info("Starting WatchdogService"); + executor.scheduleAtFixedRate(this::process, 0, config.getWatchDogWatchInterval(), TimeUnit.SECONDS); + } + + /** + * Restart PSC. Handle Platforms. + * TODO: use a Manager that determines platform and have separate platform specific implementations. + * @throws IOException in case of any exception. + */ + private void restartPSC() throws IOException { + String serviceName = config.getServiceName(); + if (SystemUtils.IS_OS_LINUX) { + Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl restart " + serviceName}); + } else if ( SystemUtils.IS_OS_WINDOWS) { + Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", serviceName + ".exe", "restart"}); + } else { + throw new IOException("Unsupported operating-system"); + } + } + + /** + * TODO: Make this part of the Monitor interface. + */ + private void process() { + log.debug("process called"); + Path path = Paths.get(config.getWatchdogFileMonitorPath()); + try { + Instant modifiedTime = Files.getLastModifiedTime(path).toInstant(); + handleUpdates(modifiedTime); + } catch (NoSuchFileException fne) { + log.error("No monitor file exists at {}.", config.getWatchdogFileMonitorPath()); + handleUpdates(this.lastModified); + } catch (IOException e) { + log.error("Could not get the last modified time for monitor file at {}. exception {}", config.getWatchdogFileMonitorPath(), e); + } + } + + private void handleUpdates(Instant modifiedTime) { + log.debug("handleUpdates called"); + if (modifiedTime.isAfter(this.lastModified)) { + onUpdate(modifiedTime); + } else { + onUpdateMissed(); + } + } + + @Override + protected void doStop() { + log.info("Stopping WatchdogSrevice"); + } + } + +} diff --git a/windows-service/PscWatchdogApp.xml b/windows-service/PscWatchdogApp.xml new file mode 100644 index 00000000..2b56b6b8 --- /dev/null +++ b/windows-service/PscWatchdogApp.xml @@ -0,0 +1,21 @@ + + watchdog1 + PSCWatchdog + Watchdog Service for Pravega Senson COllector + "C:/Program Files/Java/jdk-11/bin/java" + -Xmx2048m -cp "PSCFiles\pravega-sensor-collector-0.2.18.jar io.pravega.sensor.collector.watchdog.PscWatchdogApp" + + + + + + + 10240 + 10 + + + + + + + \ No newline at end of file From 6bee5cfdafefdb1a5f1ffc7fd98b479e02f37847 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Mon, 22 Jan 2024 09:14:17 -0800 Subject: [PATCH 02/15] service notf Signed-off-by: Abhin Balur --- .../io/pravega/sensor/collector/watchdog/WatchDogService.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java index bfaab832..c4f8a2d0 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java @@ -54,11 +54,13 @@ protected void doStart() { log.info("Starting WatchDog Service"); monitor.startAsync(); monitor.awaitTerminated(); + notifyStarted(); } @Override protected void doStop() { log.info("Stopping Watchdog Service"); + notifyStopped(); } /** @@ -120,6 +122,7 @@ public void onUpdateMissed() { protected void doStart() { log.info("Starting WatchdogService"); executor.scheduleAtFixedRate(this::process, 0, config.getWatchDogWatchInterval(), TimeUnit.SECONDS); + notifyStarted(); } /** @@ -167,6 +170,7 @@ private void handleUpdates(Instant modifiedTime) { @Override protected void doStop() { log.info("Stopping WatchdogSrevice"); + notifyStopped(); } } From bb6ea6aff18e9dcee6eabb426c02f345d63a51cb Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Mon, 22 Jan 2024 09:22:34 -0800 Subject: [PATCH 03/15] Changing gradle task name Signed-off-by: Abhin Balur --- pravega-sensor-collector/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index 994d8bd7..4cd75908 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -116,7 +116,7 @@ tasks.register('runLeapAPIMockServer', JavaExec) { main = "io.pravega.sensor.collector.leap.LeapAPIMock" } -task createExtraRunApp(type: CreateStartScripts) { +task createWatchdogApp(type: CreateStartScripts) { mainClass = "io.pravega.sensor.collector.watchdog.PscWatchdogApp" classpath = startScripts.classpath outputDir = startScripts.outputDir @@ -126,7 +126,7 @@ task createExtraRunApp(type: CreateStartScripts) { applicationDistribution.into("bin") { duplicatesStrategy= DuplicatesStrategy.EXCLUDE - from(createExtraRunApp) + from(createWatchdogApp) fileMode = 0755 } From 64a9e449b84f1748deae64acfd72019d99d96d0e Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Mon, 29 Jan 2024 01:17:05 -0800 Subject: [PATCH 04/15] Adding tests Signed-off-by: Abhin Balur --- .../collector/watchdog/PscWatchdogApp.java | 3 +- .../watchdog/PscWatchdogMonitor.java | 145 ++++++++++++++++++ .../collector/watchdog/WatchDogService.java | 132 +--------------- .../watchdog/WatchdogServiceTests.java | 85 ++++++++++ 4 files changed, 236 insertions(+), 129 deletions(-) create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java create mode 100644 pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java index cee2422d..ce229cad 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogApp.java @@ -25,8 +25,9 @@ public static void main(String[] args) { try { log.info("Watchdog starting"); final Map properties = System.getenv(); + WatchDogConfig config = new WatchDogConfig(properties); log.debug("Properties: {}", properties); - final WatchDogService service = new WatchDogService(properties); + final WatchDogService service = new WatchDogService(config); service.startAsync(); service.awaitTerminated(); } catch (Exception e) { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java new file mode 100644 index 00000000..d286172a --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java @@ -0,0 +1,145 @@ +package io.pravega.sensor.collector.watchdog; + +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.pravega.keycloak.com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + + + +/** + * Monitoring service to be started by watchdog. + * Implementations need to define actions for updates received + * or missed while monitoring. + */ +interface Monitor extends Service { + + /** + * Define action on update to resource + * being monitored. + * @param modifiedTime + */ + void onUpdate(Instant modifiedTime); + + /** + * Define action on updates missed on + * resource being monitored. + * @throws Exception throw Exception if any when missing updates. + */ + void onUpdateMissed() throws Exception; +} + + +class PSCWatchdogMonitor extends AbstractService implements Monitor { + + private final Logger log = LoggerFactory.getLogger(PSCWatchdogMonitor.class); + private Instant lastModified = Instant.EPOCH; + private int updateMissedCount = 0; + private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( + PSCWatchdogMonitor.class.getSimpleName() + "-%d").build(); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, namedThreadFactory); + private final WatchDogConfig config; + + public PSCWatchdogMonitor(WatchDogConfig config) { + this.config = config; + } + + @Override + public void onUpdate(Instant modifiedTime) { + updateMissedCount = 0; + this.lastModified = modifiedTime; + log.info("PSC Watchdog Monitor file has been updated at {}", config.getWatchdogFileMonitorPath()); + } + + @Override + public void onUpdateMissed() { + log.info("PSC Watchdog Monitor File has not been updated at {}", config.getWatchdogFileMonitorPath()); + updateMissedCount++; + log.debug("Update missed count {} and threshold is {}", updateMissedCount, config.getWatchDogFileUpdateMissedThreshold()); + // if no. of updates missed is greater that set threshold then take action. + if (areUpdatesMissedGreaterThanThreshold()) { + log.debug("Triggering restart of PSC."); + try { + restartPSC(); + } catch (IOException ioe) { + log.error("Error restarting PSC. Exception: {}", ioe); + } + updateMissedCount = 0; + } + } + + @VisibleForTesting + protected boolean areUpdatesMissedGreaterThanThreshold() { + return updateMissedCount > config.getWatchDogFileUpdateMissedThreshold(); + } + + @Override + protected void doStart() { + log.info("Starting WatchdogMonitor {}",config.getWatchDogWatchInterval()); + executor.scheduleAtFixedRate(this::process, 0, config.getWatchDogWatchInterval(), TimeUnit.SECONDS); + notifyStarted(); + } + + /** + * Restart PSC. Handle Platforms. + * TODO: use a Manager that determines platform and have separate platform specific implementations. + * @throws IOException in case of any exception. + */ + private void restartPSC() throws IOException { + String serviceName = config.getServiceName(); + if (SystemUtils.IS_OS_LINUX) { + Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl restart " + serviceName}); + } else if ( SystemUtils.IS_OS_WINDOWS) { + Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", serviceName + ".exe", "restart"}); + } else { + throw new IOException("Unsupported operating-system"); + } + } + + /** + * TODO: Make this part of the Monitor interface. + */ + private void process() { + log.debug("process called"); + Path path = Paths.get(config.getWatchdogFileMonitorPath()); + try { + Instant modifiedTime = Files.getLastModifiedTime(path).toInstant(); + handleUpdates(modifiedTime); + } catch (NoSuchFileException fne) { + log.error("No monitor file exists at {}.", config.getWatchdogFileMonitorPath()); + handleUpdates(this.lastModified); + } catch (IOException e) { + log.error("Could not get the last modified time for monitor file at {}. exception {}", config.getWatchdogFileMonitorPath(), e); + } + } + + private void handleUpdates(Instant modifiedTime) { + log.debug("handleUpdates called"); + if (modifiedTime.isAfter(this.lastModified)) { + onUpdate(modifiedTime); + } else { + onUpdateMissed(); + } + } + + @Override + protected void doStop() { + log.info("Stopping Watchdog monitor."); + executor.shutdown(); + notifyStopped(); + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java index c4f8a2d0..3232a9e0 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java @@ -10,23 +10,10 @@ package io.pravega.sensor.collector.watchdog; import com.google.common.util.concurrent.AbstractService; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.lang3.SystemUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Instant; import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; /** @@ -44,134 +31,23 @@ public class WatchDogService extends AbstractService { private final WatchDogConfig config; private final Monitor monitor; // one monitor for now - WatchDogService(Map properties) { - config = new WatchDogConfig(properties); - monitor = new PSCWatchdogMonitor(); + WatchDogService(WatchDogConfig config) { + this.config = config; + this.monitor = new PSCWatchdogMonitor(config); } @Override protected void doStart() { log.info("Starting WatchDog Service"); monitor.startAsync(); - monitor.awaitTerminated(); notifyStarted(); } @Override protected void doStop() { log.info("Stopping Watchdog Service"); + monitor.stopAsync(); notifyStopped(); } - /** - * Monitoring service to be started by watchdog. - * Implementations need to define actions for updates received - * or missed while monitoring. - */ - private interface Monitor extends Service { - - /** - * Define action on update to resource - * being monitored. - * @param modifiedTime - */ - void onUpdate(Instant modifiedTime); - - /** - * Define action on updates missed on - * resource being monitored. - * @throws Exception throw Exception if any when missing updates. - */ - void onUpdateMissed() throws Exception; - } - - private class PSCWatchdogMonitor extends AbstractService implements Monitor { - - private final Logger log = LoggerFactory.getLogger(PSCWatchdogMonitor.class); - private Instant lastModified = Instant.EPOCH; - private int updateMissedCount = 0; - private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( - PSCWatchdogMonitor.class.getSimpleName() + "-%d").build(); - private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, namedThreadFactory); - - @Override - public void onUpdate(Instant modifiedTime) { - updateMissedCount = 0; - this.lastModified = modifiedTime; - log.info("PSC Watchdog Monitor file has been updated at {}", config.getWatchdogFileMonitorPath()); - } - - @Override - public void onUpdateMissed() { - log.info("PSC Watchdog Monitor File has not been updated at {}", config.getWatchdogFileMonitorPath()); - updateMissedCount++; - log.debug("Update missed count {} and threshold is {}", updateMissedCount, config.getWatchDogFileUpdateMissedThreshold()); - // if no. of updates missed is greater that set threshold then take action. - if (updateMissedCount > config.getWatchDogFileUpdateMissedThreshold()) { - log.debug("Triggering restart of PSC."); - try { - restartPSC(); - } catch (IOException ioe) { - log.error("Error restarting PSC. Exception: {}", ioe); - } - updateMissedCount = 0; - } - } - - @Override - protected void doStart() { - log.info("Starting WatchdogService"); - executor.scheduleAtFixedRate(this::process, 0, config.getWatchDogWatchInterval(), TimeUnit.SECONDS); - notifyStarted(); - } - - /** - * Restart PSC. Handle Platforms. - * TODO: use a Manager that determines platform and have separate platform specific implementations. - * @throws IOException in case of any exception. - */ - private void restartPSC() throws IOException { - String serviceName = config.getServiceName(); - if (SystemUtils.IS_OS_LINUX) { - Runtime.getRuntime().exec(new String[]{"sh", "-c", "systemctl restart " + serviceName}); - } else if ( SystemUtils.IS_OS_WINDOWS) { - Runtime.getRuntime().exec(new String[]{"cmd.exe", "/c", serviceName + ".exe", "restart"}); - } else { - throw new IOException("Unsupported operating-system"); - } - } - - /** - * TODO: Make this part of the Monitor interface. - */ - private void process() { - log.debug("process called"); - Path path = Paths.get(config.getWatchdogFileMonitorPath()); - try { - Instant modifiedTime = Files.getLastModifiedTime(path).toInstant(); - handleUpdates(modifiedTime); - } catch (NoSuchFileException fne) { - log.error("No monitor file exists at {}.", config.getWatchdogFileMonitorPath()); - handleUpdates(this.lastModified); - } catch (IOException e) { - log.error("Could not get the last modified time for monitor file at {}. exception {}", config.getWatchdogFileMonitorPath(), e); - } - } - - private void handleUpdates(Instant modifiedTime) { - log.debug("handleUpdates called"); - if (modifiedTime.isAfter(this.lastModified)) { - onUpdate(modifiedTime); - } else { - onUpdateMissed(); - } - } - - @Override - protected void doStop() { - log.info("Stopping WatchdogSrevice"); - notifyStopped(); - } - } - } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java new file mode 100644 index 00000000..39e032be --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/watchdog/WatchdogServiceTests.java @@ -0,0 +1,85 @@ +package io.pravega.sensor.collector.watchdog; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Instant; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; + +public class WatchdogServiceTests { + + @Test + public void testWatchDogServiceStart() { + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + WatchDogService service = new WatchDogService(config); + service.startAsync(); + Assert.assertTrue(service.isRunning()); + service.stopAsync(); + Assert.assertFalse(service.isRunning()); + } + + @Test + public void testWatchdogMonitorStart() { + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); + monitor.startAsync(); + Assert.assertTrue(monitor.isRunning()); + monitor.stopAsync(); + Assert.assertFalse(monitor.isRunning()); + } + + @Test + public void testWatchdogRestartPSC() { + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); + monitor.onUpdateMissed(); + monitor.onUpdateMissed(); + monitor.onUpdateMissed(); + monitor.onUpdateMissed(); + // threshold crossed, updates missed resets to zero and restart PSC. + Assert.assertFalse(monitor.areUpdatesMissedGreaterThanThreshold()); + } + + @Test + public void testWatchdogResetCounterOnUpdates() { + WatchDogConfig config = new WatchDogConfig(ImmutableMap.builder().build()); + PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); + monitor.onUpdateMissed(); + monitor.onUpdateMissed(); + monitor.onUpdateMissed(); + monitor.onUpdate(Instant.now()); + monitor.onUpdateMissed(); + Assert.assertFalse(monitor.areUpdatesMissedGreaterThanThreshold()); + } + + @Test (timeout = 15000) + public void testWatchdogRestartPSCWhenNoFileExists() throws Exception { + ImmutableMap props = ImmutableMap.builder() + .put("WATCHDOG_WATCH_INTERVAL_SECONDS", "3") + .put("WATCHDOG_FILE_MONITOR_UPDATE_MISSED_THRESHOLD", "2") + .put("PSC_SERVICE_NAME", "test") + .build(); + WatchDogConfig config = new WatchDogConfig(props); + PSCWatchdogMonitor monitor = new PSCWatchdogMonitor(config); + monitor.startAsync(); + System.out.println(System.getProperty("user.name")); + // Threshold crossed at 6th second...stop executor at 7th second. + CompletableFuture cf = new CompletableFuture<>(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + monitor.doStop(); + cf.complete(null); + } + }, 7000); + cf.get(); // wait for task to run + Assert.assertFalse(monitor.areUpdatesMissedGreaterThanThreshold()); + } +} + + + + From 14661c726425402207bb8f809604f694f0a3da1c Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 8 Feb 2024 23:58:50 -0800 Subject: [PATCH 05/15] Support to push metric data to Pravega Stream Signed-off-by: Abhin Balur --- gradle.properties | 1 + pravega-sensor-collector/build.gradle | 9 ++ .../collector/file/FileIngestService.java | 9 +- .../sensor/collector/file/FileProcessor.java | 16 ++- .../sensor/collector/metrics/Metric.java | 65 ++++++++++ .../collector/metrics/MetricConfig.java | 119 ++++++++++++++++++ .../sensor/collector/metrics/MetricNames.java | 9 ++ .../collector/metrics/MetricPublisher.java | 67 ++++++++++ .../collector/metrics/MetricsStore.java | 33 +++++ .../collector/metrics/PravegaClient.java | 62 +++++++++ .../metrics/writers/MetricFileWriter.java | 77 ++++++++++++ .../metrics/writers/MetricStreamWriter.java | 78 ++++++++++++ .../metrics/writers/MetricWriter.java | 19 +++ .../watchdog/PscWatchdogMonitor.java | 2 +- .../collector/watchdog/WatchDogService.java | 2 - .../sensor/collector/metrics/MetricTests.java | 109 ++++++++++++++++ 16 files changed, 669 insertions(+), 8 deletions(-) create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java create mode 100644 pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java create mode 100644 pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java diff --git a/gradle.properties b/gradle.properties index fa99c486..9ece154f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,6 +15,7 @@ commonsCodecVersion=1.14 commonsMath3Version=3.6.1 grizzlyVersion=3.1.3 gsonVersion=2.10.1 +autoServiceVersion=1.1.1 includePravegaCredentials=true jacksonVersion=2.15.2 junitVersion=5.6.2 diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index 4cd75908..b387ca36 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -60,6 +60,8 @@ dependencies { implementation "com.github.vladimir-bukhtoyarov:bucket4j-core:${bucket4jVersion}" implementation "org.eclipse.milo:sdk-client:${miloVersion}" implementation "com.google.code.gson:gson:${gsonVersion}" + implementation "com.google.auto.service:auto-service:${autoServiceVersion}" + annotationProcessor "com.google.auto.service:auto-service:${autoServiceVersion}" implementation "org.apache.parquet:parquet-avro:${parquetVersion}" implementation "org.apache.parquet:parquet-hadoop:${parquetVersion}" @@ -121,6 +123,13 @@ task createWatchdogApp(type: CreateStartScripts) { classpath = startScripts.classpath outputDir = startScripts.outputDir applicationName = 'psc-watchdog' + doLast { + // Modify startup script to source the configuration file (env.sh). + unixScript.text = unixScript.text + .replace( + 'DEFAULT_JVM_OPTS=""', + 'DEFAULT_JVM_OPTS=""\n\n# Source configuration file\n[ -f "${CONF_FILE}" ] && . "${CONF_FILE}"') + } // defaultJvmOpts = ["-Xms1024m", "-Xmx2048m"] } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 609e7e1a..185b109b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -13,6 +13,9 @@ import io.pravega.client.EventStreamClientFactory; import io.pravega.sensor.collector.DeviceDriver; import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.metrics.MetricNames; +import io.pravega.sensor.collector.metrics.MetricPublisher; +import io.pravega.sensor.collector.metrics.MetricsStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +51,8 @@ public abstract class FileIngestService extends DeviceDriver { private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100; private static final int DEFAULT_INTERVAL_MS_KEY = 10000; - private final FileProcessor processor; + private final MetricPublisher metricPublisher; private final ScheduledExecutorService executor; private ScheduledFuture watchFileTask; @@ -78,6 +81,7 @@ public FileIngestService(DeviceDriverConfig config) { createStream(scopeName, getStreamName()); final EventStreamClientFactory clientFactory = getEventStreamClientFactory(scopeName); processor = FileProcessor.create(fileSequenceConfig, clientFactory); + metricPublisher = new MetricPublisher(config); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( FileIngestService.class.getSimpleName() + "-" + config.getInstanceName() + "-%d").build(); executor = Executors.newScheduledThreadPool(1, namedThreadFactory); @@ -150,6 +154,7 @@ protected void watchFiles() { try { processor.watchFiles(); } catch (Exception e) { + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).update(e.getClass().getName()); LOG.error("watchFiles: watch file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -160,6 +165,7 @@ protected void processFiles() { try { processor.processFiles(); } catch (Exception e) { + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).update(e.getClass().getName()); LOG.error("processFiles: Process file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -179,6 +185,7 @@ protected void deleteCompletedFiles() { @Override protected void doStart() { + metricPublisher.startAsync(); watchFileTask = executor.scheduleAtFixedRate( this::watchFiles, 0, diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index e20c58cf..62eec9d2 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -16,6 +16,8 @@ import io.pravega.client.stream.TxnFailedException; import io.pravega.client.stream.impl.ByteArraySerializer; import io.pravega.keycloak.com.google.common.base.Preconditions; +import io.pravega.sensor.collector.metrics.MetricNames; +import io.pravega.sensor.collector.metrics.MetricsStore; import io.pravega.sensor.collector.util.EventWriter; import io.pravega.sensor.collector.util.FileNameWithOffset; import io.pravega.sensor.collector.util.FileUtils; @@ -33,16 +35,16 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; -import java.nio.file.Paths; import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.Connection; -import java.util.List; import java.util.ArrayList; import java.util.Collections; -import java.util.Set; import java.util.HashSet; -import java.util.UUID; +import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; /** @@ -58,6 +60,8 @@ public abstract class FileProcessor { private final TransactionCoordinator transactionCoordinator; private final EventGenerator eventGenerator; private final Path movedFilesDirectory; + private final AtomicLong completedFiles; + private final AtomicLong bytesProcessed; public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter writer, TransactionCoordinator transactionCoordinator) { this.config = Preconditions.checkNotNull(config, "config"); @@ -67,6 +71,8 @@ public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter underlying metric data-type. + */ +public interface Metric { + void update(T t); +} + +class Counter implements Metric { + private Long counter = 0L; + private String metricType = "COUNTER"; + + public Long getCounter() { + return counter; + } + + public String getMetricType() { + return metricType; + } + + public void update(Long value) { + Preconditions.checkArgument(value > 0, "value needs to be positive"); + this.counter = value; + } +} + +class Gauge implements Metric { + private Long gauge = 0L; + private String metricType = "GAUGE"; + public Long getGauge() { + return gauge; + } + + public String getMetricType() { + return metricType; + } + + public void update(Long value) { + Preconditions.checkArgument(value > 0, "value needs to be positive"); + this.gauge = value; + } +} + + +class ExceptionMeter implements Metric { + private String exceptionClass; + private String metricType = "METER"; + public String getExceptionClass() { + return exceptionClass; + } + + public String getMetricType() { + return metricType; + } + + public void update(String value) { + Preconditions.checkArgument(value != null, "value needs to be non-null"); + this.exceptionClass = value; + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java new file mode 100644 index 00000000..932d2e0f --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java @@ -0,0 +1,119 @@ +package io.pravega.sensor.collector.metrics; + +import io.pravega.common.util.Property; +import io.pravega.sensor.collector.DeviceDriverConfig; + +import java.io.File; +import java.net.URI; + +/** + * MetricConfig holding config parameters and + * defaults for publishing metrics. + */ +public class MetricConfig { + private static final Property METRIC_FILE_WRITER_INTERVAL_SECONDS = Property.named("METRIC_FILE_WRITER_INTERVAL_SECONDS", "15", ""); + private static final Property METRIC_STREAM_WRITER_INTERVAL_SECONDS = Property.named("METRIC_STREAM_WRITER_INTERVAL_SECONDS", "30", ""); + private static final Property METRIC_STREAM_NAME = Property.named("METRIC_STREAM_NAME", "pscmetricsstream", ""); + private static final Property METRIC_SCOPE_NAME = Property.named("METRIC_SCOPE_NAME", "pscmetricsscope", ""); + private static final Property METRIC_CONTROLLER_URI = Property.named("PRAVEGA_CONTROLLER_URI", "tcp://localhost:9090", ""); + private static final Property METRIC_FILE_PATH = Property.named("METRIC_FILE_PATH", System.getProperty("java.io.tmpdir") + File.separator + "psc_metric.json", ""); + + + private String instanceName; + /** + * Pravega scope to publish metrics to. + */ + private String metricsScope; + /** + * Pravega Stream to publish metrics to. + */ + private String metricStream; + /** + * Interval in seconds to publish metrics to + * a known file by PSC. + */ + private int fileWriterIntervalSeconds; + /** + * Known file where PSC will dump + * telemetry data. + */ + private String metricFilePath; + /** + * Controller URI of Pravega. + */ + private URI controllerURI; + /** + * Interval in seconds to publish metrics to + * a configured stream by PSC. + */ + private int streamWriterIntervalSeconds; + + public String getInstanceName() { + return instanceName; + } + + private void setInstanceName(String instanceName) { + this.instanceName = instanceName; + } + + public String getMetricStream() { + return metricStream; + } + + private void setMetricStream(String metricStream) { + this.metricStream = metricStream; + } + + public String getMetricsScope() { + return metricsScope; + } + + public void setMetricsScope(String metricsScope) { + this.metricsScope = metricsScope; + } + + public String getMetricFilePath() { + return metricFilePath; + } + + private void setMetricFilePath(String metricFilePath) { + this.metricFilePath = metricFilePath; + } + + public URI getControllerURI() { + return controllerURI; + } + + private void setControllerURI(URI controllerURI) { + this.controllerURI = controllerURI; + } + + public int getFileWriterIntervalSeconds() { + return fileWriterIntervalSeconds; + } + + private void setFileWriterIntervalSeconds(int fileWriterIntervalSeconds) { + this.fileWriterIntervalSeconds = fileWriterIntervalSeconds; + } + + public int getStreamWriterIntervalSeconds() { + return streamWriterIntervalSeconds; + } + + private void setStreamWriterIntervalSeconds(int streamWriterIntervalSeconds) { + this.streamWriterIntervalSeconds = streamWriterIntervalSeconds; + } + + // Static factory + public static MetricConfig getMetricConfigFrom(DeviceDriverConfig ddrConfig) { + MetricConfig metricConfig = new MetricConfig(); + String instanceName = ddrConfig.getProperties().getOrDefault("PSC_INSTANCE_NAME", "");; + metricConfig.setFileWriterIntervalSeconds(Integer.parseInt(ddrConfig.getProperties().getOrDefault(METRIC_FILE_WRITER_INTERVAL_SECONDS.getName(), METRIC_FILE_WRITER_INTERVAL_SECONDS.getDefaultValue()))); + metricConfig.setStreamWriterIntervalSeconds(Integer.parseInt(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_WRITER_INTERVAL_SECONDS.getName(), METRIC_STREAM_WRITER_INTERVAL_SECONDS.getDefaultValue()))); + metricConfig.setMetricStream(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_NAME.getName() + "_" + instanceName, METRIC_STREAM_NAME.getDefaultValue() + "_" + instanceName)); + metricConfig.setControllerURI(URI.create(ddrConfig.getProperties().getOrDefault(METRIC_CONTROLLER_URI.getName(), METRIC_CONTROLLER_URI.getDefaultValue()))); + metricConfig.setMetricsScope(METRIC_SCOPE_NAME.getDefaultValue()); + metricConfig.setMetricFilePath(METRIC_FILE_PATH.getDefaultValue()); + return metricConfig; + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java new file mode 100644 index 00000000..9738753d --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java @@ -0,0 +1,9 @@ +package io.pravega.sensor.collector.metrics; + +public class MetricNames { + public static final String PREFIX = "psc" + "."; + public static final String PSC_FILES_PROCESSED_GAUGE = PREFIX + "files.processed"; + public static final String PSC_FILES_DELETED_GAUGE = PREFIX + "files.deleted"; + public static final String PSC_BYTES_PROCESSED_GAUGE = PREFIX + "bytes.processed"; + public static final String PSC_EXCEPTIONS = PREFIX + "exceptions"; +} \ No newline at end of file diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java new file mode 100644 index 00000000..773dc84c --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java @@ -0,0 +1,67 @@ +package io.pravega.sensor.collector.metrics; + +import com.google.common.util.concurrent.AbstractService; +import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.metrics.writers.MetricWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +/** + * Encompassing service that starts different implementations + * of the MetricWriter to publish metrics. + */ +public class MetricPublisher extends AbstractService { + private static final Logger LOGGER = LoggerFactory.getLogger(MetricPublisher.class); + List writers; + DeviceDriverConfig config; + + public MetricPublisher(DeviceDriverConfig config) { + this.config = config; + this.writers = new ArrayList<>(); + initializeWriters(); + } + + /** + * Uses the ServiceLoader to get Impl's of MetricWriter and initializes them + * via their constructor. + */ + private void initializeWriters() { + ServiceLoader writers = ServiceLoader.load(MetricWriter.class); + MetricConfig metricConfig = MetricConfig.getMetricConfigFrom(this.config); + try { + for (MetricWriter writer: writers) { + final Class writerClass = Class.forName(writer.getClass().getName()); + final MetricWriter w = (MetricWriter) writerClass.getConstructor(MetricConfig.class).newInstance(metricConfig); + this.writers.add(w); + LOGGER.info("Initialized MetricWriters."); + } + } catch (Exception e) { + // Throw here to halt PSC if writers are not initialized. + throw new RuntimeException(e); + } + } + + @Override + protected void doStart() { + doStartWriters(); + notifyStarted(); + } + + /** + * Start the MetricWriter publishers. + */ + private void doStartWriters() { + this.writers.forEach( writer -> writer.startAsync()); + } + + @Override + protected void doStop() { + LOGGER.info("Stopping MetricPublisher"); + this.writers.forEach(writer -> writer.stopAsync()); + notifyStopped(); + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java new file mode 100644 index 00000000..38242f94 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java @@ -0,0 +1,33 @@ +package io.pravega.sensor.collector.metrics; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; + +/** + * Simple Store holding metric data centrally. + */ +public class MetricsStore { + private static final ImmutableMap metricStore = ImmutableMap.builder() + .put(MetricNames.PSC_FILES_PROCESSED_GAUGE, new Gauge()) + .put(MetricNames.PSC_FILES_DELETED_GAUGE, new Gauge()) + .put(MetricNames.PSC_BYTES_PROCESSED_GAUGE, new Gauge()) + .build(); + + public static Metric getMetric(String metricName) { + return metricStore.get(metricName); + } + + /** + * Return a json representation of all metrics + * to be published. Uses Jackson to do so. + * @return JSON String of all metrics. + * @throws JsonProcessingException In case of any exception retrieving + * json string. + */ + public static String getMetricsAsJson() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.writerWithDefaultPrettyPrinter() + .writeValueAsString(metricStore); + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java new file mode 100644 index 00000000..ad2c5140 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -0,0 +1,62 @@ +package io.pravega.sensor.collector.metrics; + +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.EventStreamWriter; +import io.pravega.client.stream.EventWriterConfig; +import io.pravega.client.stream.ScalingPolicy; +import io.pravega.client.stream.StreamConfiguration; +import io.pravega.client.stream.impl.UTF8StringSerializer; + +import java.net.URI; + +/** + * Pravega client wrapper to talk + * to Pravega. + */ +public class PravegaClient { + private final String scope; + private final String streamName; + private final URI controllerURI; + private final EventStreamWriter writer; + + public PravegaClient(String scope, String streamName, URI controllerURI) { + this.scope = scope; + this.streamName = streamName; + this.controllerURI = controllerURI; + this.writer = initializeWriter(); + } + + public PravegaClient(String scope, String streamName) { + this.scope = scope; + this.streamName = streamName; + this.controllerURI = null; + this.writer = null; + } + + private EventStreamWriter initializeWriter() { + StreamManager streamManager = StreamManager.create(controllerURI); + final boolean scopeIsNew = streamManager.createScope(scope); + + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.fixed(1)) + .build(); + final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); + EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build()); + EventStreamWriter writer = clientFactory.createEventWriter(streamName, + new UTF8StringSerializer(), + EventWriterConfig.builder().build()); + return writer; + } + + public void writeEvent(String routingKey, String message) { + writer.writeEvent(routingKey, message); + writer.flush(); + } + + public void close() { + this.writer.close(); + } +} \ No newline at end of file diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java new file mode 100644 index 00000000..8ffeb484 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java @@ -0,0 +1,77 @@ +package io.pravega.sensor.collector.metrics.writers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.pravega.sensor.collector.metrics.MetricConfig; +import io.pravega.sensor.collector.metrics.MetricsStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Metric File Writer implementation of MetricWriter to dump + * metric to a known file. Regular updates to this file will + * be monitored by a watchdog process. + */ +@AutoService(MetricWriter.class) +public class MetricFileWriter extends AbstractService implements MetricWriter { + + private final Logger log = LoggerFactory.getLogger(MetricFileWriter.class); + private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( + MetricFileWriter.class.getSimpleName() + "-%d").build(); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, namedThreadFactory); + + private final MetricConfig config; + + public MetricFileWriter(MetricConfig config) { + this.config = config; + } + + /** + * Used by ServiceLoader. + */ + public MetricFileWriter() { + config = null; + } + + @Override + public void writeMetric() { + try { + String jsonMetrics = MetricsStore.getMetricsAsJson(); + try (BufferedWriter bw = Files.newBufferedWriter(Paths.get(config.getMetricFilePath()), StandardCharsets.US_ASCII)) { + log.info("Finished writing metric data at {}", config.getMetricFilePath()); + bw.write(jsonMetrics); + } + } catch (JsonProcessingException jpe) { + // Log. Dont throw/panic. Watchdog watching. + log.error("Error fetching metrics as json string {}", jpe); + } catch (Exception ioe) { + // Log. Dont throw/panic. Watchdog watching. + log.error("Error writing metrics file at {}", ioe); + } + } + + @Override + public void doStart() { + log.info("Starting MetricFileWriter"); + executor.scheduleAtFixedRate(this::writeMetric, 0, config.getFileWriterIntervalSeconds(), TimeUnit.SECONDS); + notifyStarted(); + } + + @Override + public void doStop() { + log.info("Stopping Watchdog monitor."); + executor.shutdown(); + notifyStopped(); + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java new file mode 100644 index 00000000..9d4190a0 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -0,0 +1,78 @@ +package io.pravega.sensor.collector.metrics.writers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.auto.service.AutoService; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.pravega.sensor.collector.metrics.MetricConfig; +import io.pravega.sensor.collector.metrics.MetricNames; +import io.pravega.sensor.collector.metrics.MetricsStore; +import io.pravega.sensor.collector.metrics.PravegaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Metric publisher that writes metrics retrieved from + * a metrics store to Pravega Stream. + */ +@AutoService(MetricWriter.class) +public class MetricStreamWriter extends AbstractService implements MetricWriter { + private final Logger log = LoggerFactory.getLogger(MetricStreamWriter.class); + private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( + MetricStreamWriter.class.getSimpleName() + "-%d").build(); + private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, namedThreadFactory); + private MetricConfig config; + private PravegaClient client; + + public MetricStreamWriter(MetricConfig config) { + this.config = config; + } + + /** + * Used by ServiceLoader. + */ + public MetricStreamWriter() { + } + + @VisibleForTesting + protected PravegaClient initalizePravegaClient() { + return new PravegaClient(config.getMetricsScope(), config.getMetricStream(), config.getControllerURI()); + } + + @Override + public void writeMetric() { + try { + String jsonMetrics = MetricsStore.getMetricsAsJson(); + this.client.writeEvent("", jsonMetrics); + log.info("Published metric blob to Pravega Stream {}", config.getMetricStream()); + } catch (JsonProcessingException jpe) { + // Just log . do not stop the scheduler + log.error("Error fetching metrics as json string {}", jpe); + } catch (Exception ioe) { + // Just log . do not stop the scheduler + log.error("Error writing metrics file at {}", ioe); + } + } + + @Override + public void doStart() { + log.info("Starting MetricStreamWriter {}"); + this.client = initalizePravegaClient(); + executor.scheduleAtFixedRate(this::writeMetric, 0, config.getStreamWriterIntervalSeconds(), TimeUnit.SECONDS); + notifyStarted(); + } + + @Override + public void doStop() { + log.info("Stopping Watchdog monitor."); + executor.shutdown(); + this.client.close(); + notifyStopped(); + } +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java new file mode 100644 index 00000000..1b99e244 --- /dev/null +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java @@ -0,0 +1,19 @@ +package io.pravega.sensor.collector.metrics.writers; + +import com.google.common.util.concurrent.Service; + +/** + *MetricWriter service to publish metrics. + * Implementations of this interface will have + * the specifics to write to different sinks + * e.g: file, stream + */ +public interface MetricWriter extends Service { + + /** + * Writes metric. Implementations will vary + * based on the sink to publish metric to. + */ + public abstract void writeMetric(); +} + diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java index d286172a..190faab0 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java @@ -89,7 +89,7 @@ protected boolean areUpdatesMissedGreaterThanThreshold() { @Override protected void doStart() { - log.info("Starting WatchdogMonitor {}",config.getWatchDogWatchInterval()); + log.info("Starting WatchdogMonitor {}", config.getWatchDogWatchInterval()); executor.scheduleAtFixedRate(this::process, 0, config.getWatchDogWatchInterval(), TimeUnit.SECONDS); notifyStarted(); } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java index 3232a9e0..066b61fe 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/WatchDogService.java @@ -13,8 +13,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; - /** * Watchdog service for Pravega Sensor Collector(PSC). diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java new file mode 100644 index 00000000..8aaa0948 --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java @@ -0,0 +1,109 @@ +package io.pravega.sensor.collector.metrics; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.collect.ImmutableMap; +import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.DeviceDriverManager; +import io.pravega.sensor.collector.Parameters; +import io.pravega.sensor.collector.metrics.writers.MetricFileWriter; +import io.pravega.sensor.collector.metrics.writers.MetricStreamWriter; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.CompletableFuture; + +public class MetricTests { + + @Test + public void metricFileWriterServiceTest() { + ImmutableMap props = ImmutableMap.builder() + .put("METRIC_FILE_WRITER_INTERVAL_SECONDS", "3") + .build(); + DeviceDriverManager manager = new DeviceDriverManager(props); + DeviceDriverConfig deviceDriverConfig = new DeviceDriverConfig("test", "testClass", Parameters.getProperties(), manager); + MetricConfig metricConfig = MetricConfig.getMetricConfigFrom(deviceDriverConfig); + MetricFileWriter fileWriter = new MetricFileWriter(metricConfig); + fileWriter.startAsync(); + fileWriter.awaitRunning(); + Assert.assertTrue(Files.exists(Paths.get(metricConfig.getMetricFilePath()))); + } + + @Test + public void streamWriterServiceTest() { + ImmutableMap props = ImmutableMap.builder() + .put("METRIC_STREAM_WRITER_INTERVAL_SECONDS", "3") + .build(); + DeviceDriverManager manager = new DeviceDriverManager(props); + DeviceDriverConfig deviceDriverConfig = new DeviceDriverConfig("test", "testClass", Parameters.getProperties(), manager); + MetricConfig metricConfig = MetricConfig.getMetricConfigFrom(deviceDriverConfig); + TestMetricStreamWriter tms = new TestMetricStreamWriter(metricConfig); + CompletableFuture result = new CompletableFuture<>(); + tms.setNotifier(result); + tms.startAsync(); + tms.awaitRunning(); + Assert.assertTrue(tms.isRunning()); + result.join(); + Assert.assertTrue(result.isDone()); + } + + @Test + public void testMetricTypes() { + Gauge guage = new Gauge(); + guage.update(10L); + Assert.assertEquals(guage.getGauge(), new Long(10L)); + Counter counter = new Counter(); + counter.update(10L); + Assert.assertEquals(counter.getCounter(), new Long(10L)); + ExceptionMeter exceptionMeter = new ExceptionMeter(); + exceptionMeter.update("test"); + Assert.assertEquals(exceptionMeter.getExceptionClass(), "test"); + } + + @Test + public void metricStoreTests() { + try { + String json = MetricsStore.getMetricsAsJson(); + Assert.assertNotNull(json); + Assert.assertEquals(((Gauge)MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE)).getGauge(), new Long(0L)); + Assert.assertEquals(((Gauge)MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE)).getGauge(), new Long(0L)); + } catch( JsonProcessingException jpe) { + Assert.fail("Error retrieving json from metricstore"); + } + } + + class TestMetricStreamWriter extends MetricStreamWriter { + MetricConfig config; + CompletableFuture notifier; + TestMetricStreamWriter(MetricConfig config) { + super(config); + this.config = config; + } + + void setNotifier(CompletableFuture cf) { + this.notifier = cf; + } + @Override + public PravegaClient initalizePravegaClient() { + TestPravegaClient client = new TestPravegaClient(config.getMetricsScope(), config.getMetricStream()); + client.setNotifier(notifier); + return client; + } + } + + class TestPravegaClient extends PravegaClient { + CompletableFuture notifier; + TestPravegaClient(String scope, String stream) { + super(scope, stream); + } + void setNotifier(CompletableFuture cf) { + this.notifier = cf; + } + + @Override + public void writeEvent(String rk, String message) { + this.notifier.complete(null); + } + } +} From 5442a3bb280f107cf7e190f69c1cc5c6a5f06b38 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 15 Feb 2024 19:44:08 -0800 Subject: [PATCH 06/15] Some more changes Signed-off-by: Abhin Balur --- .../collector/file/FileIngestService.java | 6 ++- .../sensor/collector/file/FileProcessor.java | 6 ++- .../sensor/collector/metrics/Metric.java | 44 ++++++++++++++----- .../collector/metrics/MetricConfig.java | 4 +- .../collector/metrics/MetricPublisher.java | 1 + .../collector/metrics/MetricsStore.java | 8 ++++ .../collector/metrics/PravegaClient.java | 5 +++ .../metrics/writers/MetricStreamWriter.java | 8 ++-- .../watchdog/PscWatchdogMonitor.java | 4 -- .../sensor/collector/metrics/MetricTests.java | 6 +-- 10 files changed, 65 insertions(+), 27 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 185b109b..9908e313 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -154,7 +154,7 @@ protected void watchFiles() { try { processor.watchFiles(); } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).update(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); LOG.error("watchFiles: watch file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -165,7 +165,7 @@ protected void processFiles() { try { processor.processFiles(); } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).update(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); LOG.error("processFiles: Process file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -177,6 +177,7 @@ protected void deleteCompletedFiles() { try { processor.deleteCompletedFiles(); } catch (Exception e) { + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); LOG.error("deleteCompletedFiles: Delete file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -186,6 +187,7 @@ protected void deleteCompletedFiles() { @Override protected void doStart() { metricPublisher.startAsync(); + metricPublisher.awaitRunning(); watchFileTask = executor.scheduleAtFixedRate( this::watchFiles, 0, diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index 62eec9d2..5460cb96 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -240,8 +240,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN } log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName); state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); - MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).update(this.completedFiles.incrementAndGet()); - MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).update(this.bytesProcessed.addAndGet(numOfBytes.get())); + MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).updateWith(1L); + MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).updateWith(numOfBytes.get()); // Add to completed file list only if commit is successfull else it will be taken care as part of recovery if (txnId.isPresent()) { Transaction.Status status = writer.getTransactionStatus(txnId.get()); @@ -279,6 +279,7 @@ void deleteCompletedFiles() throws Exception { */ if (Files.deleteIfExists(filePath) || Files.notExists(Paths.get(file.fileName))) { state.deleteCompletedFileRecord(file.fileName); + MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).updateWith(1L); log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath); } else { /** @@ -288,6 +289,7 @@ void deleteCompletedFiles() throws Exception { log.warn("deleteCompletedFiles: File {} doesn't exists in completed directory but still exist in default ingestion directory.", filePath); } } catch (Exception e) { + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); log.warn("Unable to delete ingested file default name:{}, and it's completed file name:{}, error: {}.", file.fileName, filePath, e.getMessage()); log.warn("Deletion will be retried on the next iteration."); // We can continue on this error. Deletion will be retried on the next iteration. diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java index 399d97a1..390037fa 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java @@ -8,7 +8,13 @@ * @param underlying metric data-type. */ public interface Metric { - void update(T t); + /** + * Implementors to update the internally maintained + * value by adding the passed value. + * @param T value to add. + */ + void updateWith(T t); + void clear(); } class Counter implements Metric { @@ -23,9 +29,14 @@ public String getMetricType() { return metricType; } - public void update(Long value) { + public void updateWith(Long value) { Preconditions.checkArgument(value > 0, "value needs to be positive"); - this.counter = value; + this.counter += value; + } + + @Override + public void clear() { + this.counter = 0L; } } @@ -40,26 +51,39 @@ public String getMetricType() { return metricType; } - public void update(Long value) { + public void updateWith(Long value) { Preconditions.checkArgument(value > 0, "value needs to be positive"); - this.gauge = value; + this.gauge += value; + } + + @Override + public void clear() { + this.gauge = 0L; } + } class ExceptionMeter implements Metric { - private String exceptionClass; - private String metricType = "METER"; + private final StringBuilder exceptionClass = new StringBuilder(); + private final String metricType = "METER"; + private final String SEPARATOR = ";"; + public String getExceptionClass() { - return exceptionClass; + return exceptionClass.toString(); } public String getMetricType() { return metricType; } - public void update(String value) { + public void updateWith(String value) { Preconditions.checkArgument(value != null, "value needs to be non-null"); - this.exceptionClass = value; + this.exceptionClass.append(value + SEPARATOR); + } + + @Override + public void clear() { + this.exceptionClass.setLength(0); } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java index 932d2e0f..acdf3961 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java @@ -107,10 +107,10 @@ private void setStreamWriterIntervalSeconds(int streamWriterIntervalSeconds) { // Static factory public static MetricConfig getMetricConfigFrom(DeviceDriverConfig ddrConfig) { MetricConfig metricConfig = new MetricConfig(); - String instanceName = ddrConfig.getProperties().getOrDefault("PSC_INSTANCE_NAME", "");; + String pscId = ddrConfig.getProperties().getOrDefault("PSC_ID", ""); metricConfig.setFileWriterIntervalSeconds(Integer.parseInt(ddrConfig.getProperties().getOrDefault(METRIC_FILE_WRITER_INTERVAL_SECONDS.getName(), METRIC_FILE_WRITER_INTERVAL_SECONDS.getDefaultValue()))); metricConfig.setStreamWriterIntervalSeconds(Integer.parseInt(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_WRITER_INTERVAL_SECONDS.getName(), METRIC_STREAM_WRITER_INTERVAL_SECONDS.getDefaultValue()))); - metricConfig.setMetricStream(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_NAME.getName() + "_" + instanceName, METRIC_STREAM_NAME.getDefaultValue() + "_" + instanceName)); + metricConfig.setMetricStream(ddrConfig.getProperties().getOrDefault(METRIC_STREAM_NAME.getName() + pscId, METRIC_STREAM_NAME.getDefaultValue() + pscId)); metricConfig.setControllerURI(URI.create(ddrConfig.getProperties().getOrDefault(METRIC_CONTROLLER_URI.getName(), METRIC_CONTROLLER_URI.getDefaultValue()))); metricConfig.setMetricsScope(METRIC_SCOPE_NAME.getDefaultValue()); metricConfig.setMetricFilePath(METRIC_FILE_PATH.getDefaultValue()); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java index 773dc84c..bf60f8f1 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java @@ -56,6 +56,7 @@ protected void doStart() { */ private void doStartWriters() { this.writers.forEach( writer -> writer.startAsync()); + this.writers.forEach( writer -> writer.awaitRunning()); } @Override diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java index 38242f94..4fe4aef3 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java @@ -12,6 +12,7 @@ public class MetricsStore { .put(MetricNames.PSC_FILES_PROCESSED_GAUGE, new Gauge()) .put(MetricNames.PSC_FILES_DELETED_GAUGE, new Gauge()) .put(MetricNames.PSC_BYTES_PROCESSED_GAUGE, new Gauge()) + .put(MetricNames.PSC_EXCEPTIONS, new ExceptionMeter()) .build(); public static Metric getMetric(String metricName) { @@ -30,4 +31,11 @@ public static String getMetricsAsJson() throws JsonProcessingException { return mapper.writerWithDefaultPrettyPrinter() .writeValueAsString(metricStore); } + + public static void clearMetrics() { + metricStore.get(MetricNames.PSC_FILES_PROCESSED_GAUGE).clear(); + metricStore.get(MetricNames.PSC_FILES_DELETED_GAUGE).clear(); + metricStore.get(MetricNames.PSC_BYTES_PROCESSED_GAUGE).clear(); + metricStore.get(MetricNames.PSC_EXCEPTIONS).clear(); + } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index ad2c5140..f54ec5e3 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -8,6 +8,8 @@ import io.pravega.client.stream.ScalingPolicy; import io.pravega.client.stream.StreamConfiguration; import io.pravega.client.stream.impl.UTF8StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; @@ -16,6 +18,8 @@ * to Pravega. */ public class PravegaClient { + + private final Logger log = LoggerFactory.getLogger(PravegaClient.class); private final String scope; private final String streamName; private final URI controllerURI; @@ -36,6 +40,7 @@ public PravegaClient(String scope, String streamName) { } private EventStreamWriter initializeWriter() { + log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString()); StreamManager streamManager = StreamManager.create(controllerURI); final boolean scopeIsNew = streamManager.createScope(scope); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index 9d4190a0..0f7e5468 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -6,7 +6,6 @@ import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.pravega.sensor.collector.metrics.MetricConfig; -import io.pravega.sensor.collector.metrics.MetricNames; import io.pravega.sensor.collector.metrics.MetricsStore; import io.pravega.sensor.collector.metrics.PravegaClient; import org.slf4j.Logger; @@ -51,18 +50,19 @@ public void writeMetric() { String jsonMetrics = MetricsStore.getMetricsAsJson(); this.client.writeEvent("", jsonMetrics); log.info("Published metric blob to Pravega Stream {}", config.getMetricStream()); + MetricsStore.clearMetrics(); } catch (JsonProcessingException jpe) { // Just log . do not stop the scheduler log.error("Error fetching metrics as json string {}", jpe); } catch (Exception ioe) { // Just log . do not stop the scheduler - log.error("Error writing metrics file at {}", ioe); + log.error("Error sending metrics to Pravega. Exception {}", ioe); } } @Override public void doStart() { - log.info("Starting MetricStreamWriter {}"); + log.info("Starting MetricStreamWriter."); this.client = initalizePravegaClient(); executor.scheduleAtFixedRate(this::writeMetric, 0, config.getStreamWriterIntervalSeconds(), TimeUnit.SECONDS); notifyStarted(); @@ -70,7 +70,7 @@ public void doStart() { @Override public void doStop() { - log.info("Stopping Watchdog monitor."); + log.info("Stopping MetricStreamWriter."); executor.shutdown(); this.client.close(); notifyStopped(); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java index 190faab0..d387f8cf 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java @@ -96,7 +96,6 @@ protected void doStart() { /** * Restart PSC. Handle Platforms. - * TODO: use a Manager that determines platform and have separate platform specific implementations. * @throws IOException in case of any exception. */ private void restartPSC() throws IOException { @@ -110,9 +109,6 @@ private void restartPSC() throws IOException { } } - /** - * TODO: Make this part of the Monitor interface. - */ private void process() { log.debug("process called"); Path path = Paths.get(config.getWatchdogFileMonitorPath()); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java index 8aaa0948..82db0896 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java @@ -51,13 +51,13 @@ public void streamWriterServiceTest() { @Test public void testMetricTypes() { Gauge guage = new Gauge(); - guage.update(10L); + guage.updateWith(10L); Assert.assertEquals(guage.getGauge(), new Long(10L)); Counter counter = new Counter(); - counter.update(10L); + counter.updateWith(10L); Assert.assertEquals(counter.getCounter(), new Long(10L)); ExceptionMeter exceptionMeter = new ExceptionMeter(); - exceptionMeter.update("test"); + exceptionMeter.updateWith("test"); Assert.assertEquals(exceptionMeter.getExceptionClass(), "test"); } From 587eae6316fb21f8d872591c16a0a6c42dbad499 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 22 Feb 2024 19:06:47 -0800 Subject: [PATCH 07/15] Correcting tests Signed-off-by: Abhin Balur --- .../io/pravega/sensor/collector/metrics/MetricTests.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java index 82db0896..7fc19623 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java @@ -19,7 +19,7 @@ public class MetricTests { @Test public void metricFileWriterServiceTest() { ImmutableMap props = ImmutableMap.builder() - .put("METRIC_FILE_WRITER_INTERVAL_SECONDS", "3") + .put("METRIC_FILE_WRITER_INTERVAL_SECONDS", "1") .build(); DeviceDriverManager manager = new DeviceDriverManager(props); DeviceDriverConfig deviceDriverConfig = new DeviceDriverConfig("test", "testClass", Parameters.getProperties(), manager); @@ -27,6 +27,11 @@ public void metricFileWriterServiceTest() { MetricFileWriter fileWriter = new MetricFileWriter(metricConfig); fileWriter.startAsync(); fileWriter.awaitRunning(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } Assert.assertTrue(Files.exists(Paths.get(metricConfig.getMetricFilePath()))); } @@ -58,7 +63,7 @@ public void testMetricTypes() { Assert.assertEquals(counter.getCounter(), new Long(10L)); ExceptionMeter exceptionMeter = new ExceptionMeter(); exceptionMeter.updateWith("test"); - Assert.assertEquals(exceptionMeter.getExceptionClass(), "test"); + Assert.assertEquals(exceptionMeter.getExceptionClass(), "test;"); } @Test From 36b077c8379b0cae75c4717ec01cb2b169be82ba Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Tue, 27 Feb 2024 10:40:50 -0800 Subject: [PATCH 08/15] Handling comments Signed-off-by: Abhin Balur --- .../src/main/dist/bin/install-service.sh | 1 - .../collector/file/FileIngestService.java | 6 +-- .../sensor/collector/file/FileProcessor.java | 8 ++-- .../sensor/collector/metrics/Metric.java | 39 +++++++++++++++---- .../collector/metrics/MetricConfig.java | 9 +++++ .../sensor/collector/metrics/MetricNames.java | 9 +++++ .../collector/metrics/MetricPublisher.java | 9 +++++ .../collector/metrics/MetricsStore.java | 9 +++++ .../collector/metrics/PravegaClient.java | 2 +- .../metrics/writers/MetricFileWriter.java | 9 +++++ .../metrics/writers/MetricStreamWriter.java | 9 +++++ .../metrics/writers/MetricWriter.java | 9 +++++ .../watchdog/PscWatchdogMonitor.java | 9 +++++ .../sensor/collector/metrics/MetricTests.java | 6 +-- windows-service/PscWatchdogApp.xml | 2 +- 15 files changed, 115 insertions(+), 21 deletions(-) diff --git a/pravega-sensor-collector/src/main/dist/bin/install-service.sh b/pravega-sensor-collector/src/main/dist/bin/install-service.sh index dfa66e64..c7e20e36 100755 --- a/pravega-sensor-collector/src/main/dist/bin/install-service.sh +++ b/pravega-sensor-collector/src/main/dist/bin/install-service.sh @@ -8,7 +8,6 @@ # # http://www.apache.org/licenses/LICENSE-2.0 # -#TODO: split this into 2 functions, start-watchdog and start-psc set -x ROOT_DIR=$(readlink -f $(dirname $0)/..) SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 44247d5b..a17c43d3 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -154,7 +154,7 @@ protected void watchFiles() { try { processor.watchFiles(); } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName()); LOG.error("watchFiles: watch file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -165,7 +165,7 @@ protected void processFiles() { try { processor.processFiles(); } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName()); LOG.error("processFiles: Process file error", e); // Continue on any errors. We will retry on the next iteration. } @@ -177,7 +177,7 @@ protected void deleteCompletedFiles() { try { processor.deleteCompletedFiles(); } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName()); LOG.error("deleteCompletedFiles: Delete file error", e); // Continue on any errors. We will retry on the next iteration. } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index d64719fd..ffe89c63 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -240,8 +240,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN } log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName); state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); - MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).updateWith(1L); - MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).updateWith(numOfBytes.get()); + MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).incrementBy(1L); + MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).incrementBy(numOfBytes.get()); // Add to completed file list only if commit is successfull else it will be taken care as part of recovery if (txnId.isPresent()) { Transaction.Status status = writer.getTransactionStatus(txnId.get()); @@ -279,7 +279,7 @@ void deleteCompletedFiles() throws Exception { */ if (Files.deleteIfExists(filePath) || Files.notExists(Paths.get(file.fileName))) { state.deleteCompletedFileRecord(file.fileName); - MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).updateWith(1L); + MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).incrementBy(1L); log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath); } else { /** @@ -289,7 +289,7 @@ void deleteCompletedFiles() throws Exception { log.warn("deleteCompletedFiles: File {} doesn't exists in completed directory but still exist in default ingestion directory.", filePath); } } catch (Exception e) { - MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName()); + MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName()); log.warn("Unable to delete ingested file default name:{}, and it's completed file name:{}, error: {}.", file.fileName, filePath, e.getMessage()); log.warn("Deletion will be retried on the next iteration."); // We can continue on this error. Deletion will be retried on the next iteration. diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java index 390037fa..c7863f3d 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/Metric.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics; import com.google.common.base.Preconditions; @@ -13,10 +22,18 @@ public interface Metric { * value by adding the passed value. * @param T value to add. */ - void updateWith(T t); + void incrementBy(T t); + + /** + * Clears/resets the underlying data type/structure + * stored by the Metric implementations. + */ void clear(); } +/** + * Representation of Counter Metric. + */ class Counter implements Metric { private Long counter = 0L; private String metricType = "COUNTER"; @@ -29,17 +46,20 @@ public String getMetricType() { return metricType; } - public void updateWith(Long value) { + public synchronized void incrementBy(Long value) { Preconditions.checkArgument(value > 0, "value needs to be positive"); this.counter += value; } @Override - public void clear() { + public synchronized void clear() { this.counter = 0L; } } +/** + * Representation of Gauge Metric. + */ class Gauge implements Metric { private Long gauge = 0L; private String metricType = "GAUGE"; @@ -51,19 +71,22 @@ public String getMetricType() { return metricType; } - public void updateWith(Long value) { + public synchronized void incrementBy(Long value) { Preconditions.checkArgument(value > 0, "value needs to be positive"); this.gauge += value; } @Override - public void clear() { + public synchronized void clear() { this.gauge = 0L; } } - +/** + * Metric class to represent a meter + * storing exception strings. + */ class ExceptionMeter implements Metric { private final StringBuilder exceptionClass = new StringBuilder(); private final String metricType = "METER"; @@ -77,13 +100,13 @@ public String getMetricType() { return metricType; } - public void updateWith(String value) { + public synchronized void incrementBy(String value) { Preconditions.checkArgument(value != null, "value needs to be non-null"); this.exceptionClass.append(value + SEPARATOR); } @Override - public void clear() { + public synchronized void clear() { this.exceptionClass.setLength(0); } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java index acdf3961..0b873184 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricConfig.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics; import io.pravega.common.util.Property; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java index 9738753d..785b0b84 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricNames.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics; public class MetricNames { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java index bf60f8f1..045ebf94 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricPublisher.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics; import com.google.common.util.concurrent.AbstractService; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java index 4fe4aef3..487d2b49 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index f54ec5e3..43c77fc2 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -64,4 +64,4 @@ public void writeEvent(String routingKey, String message) { public void close() { this.writer.close(); } -} \ No newline at end of file +} diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java index 8ffeb484..15282ccf 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricFileWriter.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics.writers; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index 0f7e5468..8fc25d3b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics.writers; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java index 1b99e244..4341fa23 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricWriter.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.metrics.writers; import com.google.common.util.concurrent.Service; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java index d387f8cf..0013769b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/watchdog/PscWatchdogMonitor.java @@ -1,3 +1,12 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ package io.pravega.sensor.collector.watchdog; import com.google.common.util.concurrent.AbstractService; diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java index 7fc19623..04ac3085 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/metrics/MetricTests.java @@ -56,13 +56,13 @@ public void streamWriterServiceTest() { @Test public void testMetricTypes() { Gauge guage = new Gauge(); - guage.updateWith(10L); + guage.incrementBy(10L); Assert.assertEquals(guage.getGauge(), new Long(10L)); Counter counter = new Counter(); - counter.updateWith(10L); + counter.incrementBy(10L); Assert.assertEquals(counter.getCounter(), new Long(10L)); ExceptionMeter exceptionMeter = new ExceptionMeter(); - exceptionMeter.updateWith("test"); + exceptionMeter.incrementBy("test"); Assert.assertEquals(exceptionMeter.getExceptionClass(), "test;"); } diff --git a/windows-service/PscWatchdogApp.xml b/windows-service/PscWatchdogApp.xml index 2b56b6b8..c68d97a6 100644 --- a/windows-service/PscWatchdogApp.xml +++ b/windows-service/PscWatchdogApp.xml @@ -18,4 +18,4 @@ - \ No newline at end of file + From d57b809cd40442af210dd24a10671a30e9a608f8 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Tue, 27 Feb 2024 11:00:07 -0800 Subject: [PATCH 09/15] Stopping MetricPublisher Signed-off-by: Abhin Balur --- .../io/pravega/sensor/collector/file/FileIngestService.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index a17c43d3..71625325 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Ingestion service with common implementation logic for all files. @@ -220,6 +221,11 @@ protected void doStop() { watchFileTask.cancel(false); processFileTask.cancel(false); deleteFileTask.cancel(false); + try { + metricPublisher.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException e) { + LOG.warn("Timed out stopping MetricPublisher {}", e); + } LOG.info("doStop: Cancelled ingestion, process and delete file task"); notifyStopped(); } From 6845d7dc62c501009c0ea7a95d911b2275c73f43 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Tue, 27 Feb 2024 23:25:57 -0800 Subject: [PATCH 10/15] Not starting MetricStreamWriter Signed-off-by: Abhin Balur --- .../sensor/collector/metrics/writers/MetricStreamWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index 8fc25d3b..ecebfc1b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -29,7 +29,7 @@ * Metric publisher that writes metrics retrieved from * a metrics store to Pravega Stream. */ -@AutoService(MetricWriter.class) +//@AutoService(MetricWriter.class) public class MetricStreamWriter extends AbstractService implements MetricWriter { private final Logger log = LoggerFactory.getLogger(MetricStreamWriter.class); private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( From 0c7a6fc0ff667021a48ab123a3786ea954b0d4e0 Mon Sep 17 00:00:00 2001 From: Kuldeep Kumar <100260049+kuldeepk3@users.noreply.github.com> Date: Thu, 29 Feb 2024 11:25:44 +0530 Subject: [PATCH 11/15] Fixing unit test case (#64) Fixing FileIngestServiceUnitTest --- .../sensor/collector/DeviceDriver.java | 6 ++++++ .../collector/file/FileIngestService.java | 2 +- .../collector/file/FileIngestServiceTest.java | 1 + .../collector/file/MockFileIngestService.java | 19 +++++++++++++++++++ 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java index f5bdeea6..0ff34835 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriver.java @@ -15,6 +15,7 @@ import io.pravega.client.EventStreamClientFactory; import io.pravega.client.admin.StreamManager; import io.pravega.client.stream.StreamConfiguration; +import io.pravega.sensor.collector.metrics.MetricPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,4 +77,9 @@ protected void createStream(String scopeName, String streamName) { streamManager.createStream(scopeName, streamName, streamConfig); } } + + protected MetricPublisher getMetricPublisher(DeviceDriverConfig config) { + final MetricPublisher metricPublisher = new MetricPublisher(config); + return metricPublisher; + } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index a17c43d3..20557b9e 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -81,7 +81,7 @@ public FileIngestService(DeviceDriverConfig config) { createStream(scopeName, getStreamName()); final EventStreamClientFactory clientFactory = getEventStreamClientFactory(scopeName); processor = FileProcessor.create(fileSequenceConfig, clientFactory); - metricPublisher = new MetricPublisher(config); + metricPublisher = getMetricPublisher(config); ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( FileIngestService.class.getSimpleName() + "-" + config.getInstanceName() + "-%d").build(); executor = Executors.newScheduledThreadPool(1, namedThreadFactory); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java index 582f01b2..e5fe0741 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Map; + /* * Test class for FileIngestService */ diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/MockFileIngestService.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/MockFileIngestService.java index 4c16a210..cefcb05e 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/MockFileIngestService.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/MockFileIngestService.java @@ -1,6 +1,7 @@ package io.pravega.sensor.collector.file; import io.pravega.sensor.collector.DeviceDriverConfig; +import io.pravega.sensor.collector.metrics.MetricPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,4 +19,22 @@ public MockFileIngestService(DeviceDriverConfig config) { protected void createStream(String scopeName, String streamName) { LOGGER.info("Do nothing for create stream"); } + + @Override + protected MetricPublisher getMetricPublisher(DeviceDriverConfig config){ + MetricPublisher metricPublisher = new TestMetricPublisher(config); + return metricPublisher; + } + + class TestMetricPublisher extends MetricPublisher { + public TestMetricPublisher(DeviceDriverConfig config) { + super(config); + } + + + @Override + protected void doStart() { + notifyStarted(); + } + } } From 410358ed2e65c77b30014df328b808c14c6b3db4 Mon Sep 17 00:00:00 2001 From: Kuldeep Kumar <100260049+kuldeepk3@users.noreply.github.com> Date: Thu, 29 Feb 2024 16:42:29 +0530 Subject: [PATCH 12/15] Fixing keycloak credentials issue (#65) Fixing unable to load KeyCLoak creds --- pravega-sensor-collector/build.gradle | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index f1acf35b..99dac836 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -46,11 +46,11 @@ dependencies { implementation "io.pravega:pravega-client:${pravegaVersion}", "io.pravega:pravega-common:${pravegaVersion}", - "commons-cli:commons-cli:${commonsCLIVersion}", - "io.pravega:pravega-standalone:${pravegaVersion}", - "io.pravega:pravega-test-integration:${pravegaVersion}" - + "commons-cli:commons-cli:${commonsCLIVersion}" + testImplementation "io.pravega:pravega-standalone:${pravegaVersion}", + "io.pravega:pravega-test-integration:${pravegaVersion}" + if (includePravegaCredentials.toBoolean()) { implementation "io.pravega:pravega-keycloak-client:${pravegaCredentialsVersion}" } From 5cb429db08da83a6f4dbbc45d3c8a220dc7bef5d Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 29 Feb 2024 03:21:28 -0800 Subject: [PATCH 13/15] Fixing Authorization failure Signed-off-by: Abhin Balur --- .../io/pravega/sensor/collector/metrics/PravegaClient.java | 7 +++++-- .../collector/metrics/writers/MetricStreamWriter.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index 43c77fc2..0eef13ab 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -8,6 +8,8 @@ import io.pravega.client.stream.ScalingPolicy; import io.pravega.client.stream.StreamConfiguration; import io.pravega.client.stream.impl.UTF8StringSerializer; +import io.pravega.sensor.collector.PravegaClientConfig; +import io.pravega.sensor.collector.PravegaClientPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +43,8 @@ public PravegaClient(String scope, String streamName) { private EventStreamWriter initializeWriter() { log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString()); - StreamManager streamManager = StreamManager.create(controllerURI); + ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build(); + StreamManager streamManager = StreamManager.create(clientConfig); final boolean scopeIsNew = streamManager.createScope(scope); StreamConfiguration streamConfig = StreamConfiguration.builder() @@ -49,7 +52,7 @@ private EventStreamWriter initializeWriter() { .build(); final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - ClientConfig.builder().controllerURI(controllerURI).build()); + clientConfig); EventStreamWriter writer = clientFactory.createEventWriter(streamName, new UTF8StringSerializer(), EventWriterConfig.builder().build()); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index ecebfc1b..8fc25d3b 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -29,7 +29,7 @@ * Metric publisher that writes metrics retrieved from * a metrics store to Pravega Stream. */ -//@AutoService(MetricWriter.class) +@AutoService(MetricWriter.class) public class MetricStreamWriter extends AbstractService implements MetricWriter { private final Logger log = LoggerFactory.getLogger(MetricStreamWriter.class); private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( From dea286f4d75952624582e6f02e2741c1348d3bbf Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 29 Feb 2024 04:06:47 -0800 Subject: [PATCH 14/15] Handling comment Signed-off-by: Abhin Balur --- .../java/io/pravega/sensor/collector/metrics/MetricsStore.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java index 487d2b49..d6a28c0a 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/MetricsStore.java @@ -17,6 +17,8 @@ * Simple Store holding metric data centrally. */ public class MetricsStore { + private static ObjectMapper mapper = new ObjectMapper(); + private static final ImmutableMap metricStore = ImmutableMap.builder() .put(MetricNames.PSC_FILES_PROCESSED_GAUGE, new Gauge()) .put(MetricNames.PSC_FILES_DELETED_GAUGE, new Gauge()) @@ -36,7 +38,6 @@ public static Metric getMetric(String metricName) { * json string. */ public static String getMetricsAsJson() throws JsonProcessingException { - ObjectMapper mapper = new ObjectMapper(); return mapper.writerWithDefaultPrettyPrinter() .writeValueAsString(metricStore); } From 35a1487184238044facaafc6b0e0fb98102ec8c4 Mon Sep 17 00:00:00 2001 From: Abhin Balur Date: Thu, 29 Feb 2024 07:19:08 -0800 Subject: [PATCH 15/15] Fixing stopAsync issues Signed-off-by: Abhin Balur --- .../collector/metrics/PravegaClient.java | 27 ++++----- .../metrics/writers/MetricStreamWriter.java | 8 ++- ...ravegaSensorCollectorIntegrationTests.java | 55 +++++++++---------- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java index 0eef13ab..2306beea 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/PravegaClient.java @@ -43,19 +43,20 @@ public PravegaClient(String scope, String streamName) { private EventStreamWriter initializeWriter() { log.info("Initializing writer with {} {} {}", this.scope, this.streamName, this.controllerURI.toString()); - ClientConfig clientConfig = ClientConfig.builder().controllerURI(this.controllerURI).build(); - StreamManager streamManager = StreamManager.create(clientConfig); - final boolean scopeIsNew = streamManager.createScope(scope); - - StreamConfiguration streamConfig = StreamConfiguration.builder() - .scalingPolicy(ScalingPolicy.fixed(1)) - .build(); - final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); - EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - clientConfig); - EventStreamWriter writer = clientFactory.createEventWriter(streamName, - new UTF8StringSerializer(), - EventWriterConfig.builder().build()); + try (StreamManager streamManager = StreamManager.create(controllerURI)) { + final boolean scopeIsNew = streamManager.createScope(scope); + StreamConfiguration streamConfig = StreamConfiguration.builder() + .scalingPolicy(ScalingPolicy.fixed(1)) + .build(); + final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig); + } + EventStreamWriter writer; + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build())) { + writer = clientFactory.createEventWriter(streamName, + new UTF8StringSerializer(), + EventWriterConfig.builder().build()); + } return writer; } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java index 8fc25d3b..d6905fc3 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/metrics/writers/MetricStreamWriter.java @@ -80,8 +80,14 @@ public void doStart() { @Override public void doStop() { log.info("Stopping MetricStreamWriter."); - executor.shutdown(); + executor.shutdownNow(); + try { + executor.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.error("Error stopping MetricStreamWriter {}", e); + } this.client.close(); + log.info("Stopped MetricStreamWriter."); notifyStopped(); } } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java index 8add6ae6..ab560297 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java @@ -88,7 +88,7 @@ public void testPSCDataIntegration() { Service startService = deviceDriverManager.startAsync(); try { startService.awaitRunning(Duration.ofSeconds(30)); - Thread.sleep(12000); + Thread.sleep(15000); } catch (InterruptedException | TimeoutException e) { throw new RuntimeException(e); } @@ -116,37 +116,34 @@ public void testPSCDataIntegration() { } private static void validateStreamData(URI controllerURI, String scope, String streamName, String content) { - StreamManager streamManager = StreamManager.create(controllerURI); - - final String readerGroup = UUID.randomUUID().toString().replace("-", ""); - final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() - .stream(Stream.of(scope, streamName)) - .build(); - try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) { - readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); - } + try (StreamManager streamManager = StreamManager.create(controllerURI)) { + + final String readerGroup = UUID.randomUUID().toString().replace("-", ""); + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(scope, streamName)) + .build(); + try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) { + readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + } - try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, - ClientConfig.builder().controllerURI(controllerURI).build()); - EventStreamReader reader = clientFactory.createReader("reader", - readerGroup, - new UTF8StringSerializer(), - ReaderConfig.builder().build())) { - System.out.format("Reading all the events from %s/%s%n", scope, streamName); - EventRead eventRead = null; - try { - while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) { - String event = eventRead.getEvent(); - System.out.format("Read event: %s", event); - Assertions.assertNotNull(event); - Assertions.assertFalse(event.isEmpty()); - Assertions.assertEquals(content, event); + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build()); + EventStreamReader reader = clientFactory.createReader("reader", + readerGroup, + new UTF8StringSerializer(), + ReaderConfig.builder().build())) { + EventRead eventRead; + try { + while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) { + String event = eventRead.getEvent(); + Assertions.assertNotNull(event); + Assertions.assertFalse(event.isEmpty()); + Assertions.assertEquals(content, event); + } + } catch (ReinitializationRequiredException e) { + //There are certain circumstances where the reader needs to be reinitialized } - } catch (ReinitializationRequiredException e) { - //There are certain circumstances where the reader needs to be reinitialized - e.printStackTrace(); } - System.out.format("No more events from %s/%s%n", scope, streamName); } }