Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 46: Watchdog service for Pravega Sensor Collector #47

Merged
merged 19 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ commonsCodecVersion=1.14
commonsMath3Version=3.6.1
grizzlyVersion=2.35
gsonVersion=2.10.1
autoServiceVersion=1.1.1
includePravegaCredentials=true
jacksonVersion=2.15.2
junitVersion=5.10.1
Expand Down
31 changes: 27 additions & 4 deletions pravega-sensor-collector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ dependencies {

implementation "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
"commons-cli:commons-cli:${commonsCLIVersion}",
"io.pravega:pravega-standalone:${pravegaVersion}",
"io.pravega:pravega-test-integration:${pravegaVersion}"

"commons-cli:commons-cli:${commonsCLIVersion}"

testImplementation "io.pravega:pravega-standalone:${pravegaVersion}",
"io.pravega:pravega-test-integration:${pravegaVersion}"

if (includePravegaCredentials.toBoolean()) {
implementation "io.pravega:pravega-keycloak-client:${pravegaCredentialsVersion}"
}
Expand All @@ -63,6 +63,8 @@ dependencies {
implementation "com.github.vladimir-bukhtoyarov:bucket4j-core:${bucket4jVersion}"
implementation "org.eclipse.milo:sdk-client:${miloVersion}"
implementation "com.google.code.gson:gson:${gsonVersion}"
implementation "com.google.auto.service:auto-service:${autoServiceVersion}"
annotationProcessor "com.google.auto.service:auto-service:${autoServiceVersion}"

implementation "org.apache.parquet:parquet-avro:${parquetVersion}"
implementation "org.apache.parquet:parquet-hadoop:${parquetVersion}"
Expand Down Expand Up @@ -120,6 +122,27 @@ tasks.register('runLeapAPIMockServer', JavaExec) {
main = "io.pravega.sensor.collector.leap.LeapAPIMock"
}

task createWatchdogApp(type: CreateStartScripts) {
mainClass = "io.pravega.sensor.collector.watchdog.PscWatchdogApp"
classpath = startScripts.classpath
outputDir = startScripts.outputDir
applicationName = 'psc-watchdog'
doLast {
// Modify startup script to source the configuration file (env.sh).
unixScript.text = unixScript.text
.replace(
'DEFAULT_JVM_OPTS=""',
'DEFAULT_JVM_OPTS=""\n\n# Source configuration file\n[ -f "${CONF_FILE}" ] && . "${CONF_FILE}"')
}
// defaultJvmOpts = ["-Xms1024m", "-Xmx2048m"]
}

applicationDistribution.into("bin") {
duplicatesStrategy= DuplicatesStrategy.EXCLUDE
from(createWatchdogApp)
fileMode = 0755
}

tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {
reports {
xml {
Expand Down
6 changes: 6 additions & 0 deletions pravega-sensor-collector/src/main/dist/bin/install-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})}
echo Installing service ${SERVICE_NAME} located in ${ROOT_DIR}.
sed -e "s:\${ROOT_DIR}:${ROOT_DIR}:g" ${ROOT_DIR}/etc/pravega-sensor-collector-TEMPLATE.service \
> ${ROOT_DIR}/etc/${SERVICE_NAME}.service
sed -e "s:\${ROOT_DIR}:${ROOT_DIR}:g" ${ROOT_DIR}/etc/psc-watchdog-TEMPLATE.service \
> ${ROOT_DIR}/etc/psc-watchdog.service
systemctl stop ${SERVICE_NAME}.service
systemctl stop psc-watchdog.service
ln -svf ${ROOT_DIR}/etc/${SERVICE_NAME}.service /etc/systemd/system/${SERVICE_NAME}.service
ln -svf ${ROOT_DIR}/etc/psc-watchdog.service /etc/systemd/system/psc-watchdog.service
systemctl daemon-reload
systemctl enable ${SERVICE_NAME}.service
systemctl enable psc-watchdog.service
systemctl start ${SERVICE_NAME}.service
systemctl start psc-watchdog.service
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ set -x
ROOT_DIR=$(readlink -f $(dirname $0)/..)
SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})}
systemctl stop ${SERVICE_NAME}.service
systemctl disable ${SERVICE_NAME}.service
systemctl stop psc-watchdog.service
systemctl disable psc-watchdog.service
systemctl daemon-reload
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
[Unit]
Description=Watchdog for Pravega Sensor Collector
After=network.target

[Service]
ExecStart=${ROOT_DIR}/bin/psc-watchdog
WorkingDirectory=${ROOT_DIR}
RestartSec=15s
Restart=on-failure
Type=simple
Environment="CONF_FILE=${ROOT_DIR}/conf/env.sh"
Environment="PSC_WATCHDOG_OPTS=-XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"

[Install]
WantedBy=multi-user.target
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.sensor.collector.metrics.MetricPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,4 +77,9 @@ protected void createStream(String scopeName, String streamName) {
streamManager.createStream(scopeName, streamName, streamConfig);
}
}

protected MetricPublisher getMetricPublisher(DeviceDriverConfig config) {
final MetricPublisher metricPublisher = new MetricPublisher(config);
return metricPublisher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.pravega.client.EventStreamClientFactory;
import io.pravega.sensor.collector.DeviceDriver;
import io.pravega.sensor.collector.DeviceDriverConfig;
import io.pravega.sensor.collector.metrics.MetricNames;
import io.pravega.sensor.collector.metrics.MetricPublisher;
import io.pravega.sensor.collector.metrics.MetricsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,6 +24,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Ingestion service with common implementation logic for all files.
Expand Down Expand Up @@ -48,8 +52,8 @@ public abstract class FileIngestService extends DeviceDriver {
private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private final FileProcessor processor;
private final MetricPublisher metricPublisher;
private final ScheduledExecutorService executor;

private ScheduledFuture<?> watchFileTask;
Expand Down Expand Up @@ -78,6 +82,7 @@ public FileIngestService(DeviceDriverConfig config) {
createStream(scopeName, getStreamName());
final EventStreamClientFactory clientFactory = getEventStreamClientFactory(scopeName);
processor = FileProcessor.create(fileSequenceConfig, clientFactory);
metricPublisher = getMetricPublisher(config);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
FileIngestService.class.getSimpleName() + "-" + config.getInstanceName() + "-%d").build();
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
Expand Down Expand Up @@ -150,6 +155,7 @@ protected void watchFiles() {
try {
processor.watchFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("watchFiles: watch file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand All @@ -160,6 +166,7 @@ protected void processFiles() {
try {
processor.processFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("processFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand All @@ -171,6 +178,7 @@ protected void deleteCompletedFiles() {
try {
processor.deleteCompletedFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("deleteCompletedFiles: Delete file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand All @@ -179,6 +187,8 @@ protected void deleteCompletedFiles() {

@Override
protected void doStart() {
metricPublisher.startAsync();
abhinb marked this conversation as resolved.
Show resolved Hide resolved
metricPublisher.awaitRunning();
watchFileTask = executor.scheduleAtFixedRate(
this::watchFiles,
0,
Expand Down Expand Up @@ -211,6 +221,11 @@ protected void doStop() {
watchFileTask.cancel(false);
processFileTask.cancel(false);
deleteFileTask.cancel(false);
try {
metricPublisher.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
LOG.warn("Timed out stopping MetricPublisher {}", e);
}
LOG.info("doStop: Cancelled ingestion, process and delete file task");
notifyStopped();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.keycloak.com.google.common.base.Preconditions;
import io.pravega.sensor.collector.metrics.MetricNames;
import io.pravega.sensor.collector.metrics.MetricsStore;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
Expand All @@ -33,16 +35,16 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.util.List;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;
import java.util.UUID;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -58,6 +60,8 @@ public abstract class FileProcessor {
private final TransactionCoordinator transactionCoordinator;
private final EventGenerator eventGenerator;
private final Path movedFilesDirectory;
private final AtomicLong completedFiles;
private final AtomicLong bytesProcessed;

public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {
this.config = Preconditions.checkNotNull(config, "config");
Expand All @@ -67,6 +71,8 @@ public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<by
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
this.eventGenerator = Preconditions.checkNotNull(getEventGenerator(config), "eventGenerator");
this.movedFilesDirectory = Paths.get(config.stateDatabaseFileName).getParent();
this.completedFiles = new AtomicLong();
this.bytesProcessed = new AtomicLong();
}

public static FileProcessor create(
Expand Down Expand Up @@ -234,6 +240,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
}
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).incrementBy(1L);
MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).incrementBy(numOfBytes.get());
// Add to completed file list only if commit is successfull else it will be taken care as part of recovery
if (txnId.isPresent()) {
Transaction.Status status = writer.getTransactionStatus(txnId.get());
Expand Down Expand Up @@ -271,6 +279,7 @@ void deleteCompletedFiles() throws Exception {
*/
if (Files.deleteIfExists(filePath) || Files.notExists(Paths.get(file.fileName))) {
state.deleteCompletedFileRecord(file.fileName);
MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).incrementBy(1L);
log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath);
} else {
/**
Expand All @@ -280,6 +289,7 @@ void deleteCompletedFiles() throws Exception {
log.warn("deleteCompletedFiles: File {} doesn't exists in completed directory but still exist in default ingestion directory.", filePath);
}
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
log.warn("Unable to delete ingested file default name:{}, and it's completed file name:{}, error: {}.", file.fileName, filePath, e.getMessage());
log.warn("Deletion will be retried on the next iteration.");
// We can continue on this error. Deletion will be retried on the next iteration.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

import com.google.common.base.Preconditions;

/**
* Interface representing a Metric
* that can be published.
* @param <T> underlying metric data-type.
*/
public interface Metric<T> {
/**
* Implementors to update the internally maintained
* value by adding the passed value.
* @param T value to add.
*/
void incrementBy(T t);

/**
* Clears/resets the underlying data type/structure
* stored by the Metric implementations.
*/
void clear();
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Representation of Counter Metric.
*/
class Counter implements Metric<Long> {
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
private Long counter = 0L;
private String metricType = "COUNTER";

public Long getCounter() {
return counter;
}

public String getMetricType() {
return metricType;
}

public synchronized void incrementBy(Long value) {
Preconditions.checkArgument(value > 0, "value needs to be positive");
this.counter += value;
}

@Override
public synchronized void clear() {
this.counter = 0L;
}
}

/**
* Representation of Gauge Metric.
*/
class Gauge implements Metric<Long> {
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
private Long gauge = 0L;
private String metricType = "GAUGE";
public Long getGauge() {
return gauge;
}

public String getMetricType() {
return metricType;
}

public synchronized void incrementBy(Long value) {
Preconditions.checkArgument(value > 0, "value needs to be positive");
this.gauge += value;
}

@Override
public synchronized void clear() {
this.gauge = 0L;
}

}

/**
* Metric class to represent a meter
* storing exception strings.
*/
class ExceptionMeter implements Metric<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

private final StringBuilder exceptionClass = new StringBuilder();
private final String metricType = "METER";
private final String SEPARATOR = ";";

public String getExceptionClass() {
return exceptionClass.toString();
}

public String getMetricType() {
return metricType;
}

public synchronized void incrementBy(String value) {
Preconditions.checkArgument(value != null, "value needs to be non-null");
this.exceptionClass.append(value + SEPARATOR);
}

@Override
public synchronized void clear() {
this.exceptionClass.setLength(0);
}
}
Loading
Loading