Skip to content

Commit

Permalink
Handling comments
Browse files Browse the repository at this point in the history
Signed-off-by: Abhin Balur <[email protected]>
  • Loading branch information
abhinb committed Feb 27, 2024
1 parent 6fc0945 commit 36b077c
Show file tree
Hide file tree
Showing 15 changed files with 115 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#TODO: split this into 2 functions, start-watchdog and start-psc
set -x
ROOT_DIR=$(readlink -f $(dirname $0)/..)
SERVICE_NAME=${SERVICE_NAME:-$(basename ${ROOT_DIR})}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected void watchFiles() {
try {
processor.watchFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName());
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("watchFiles: watch file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand All @@ -165,7 +165,7 @@ protected void processFiles() {
try {
processor.processFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName());
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("processFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand All @@ -177,7 +177,7 @@ protected void deleteCompletedFiles() {
try {
processor.deleteCompletedFiles();
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName());
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
LOG.error("deleteCompletedFiles: Delete file error", e);
// Continue on any errors. We will retry on the next iteration.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
}
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFileRecord(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).updateWith(1L);
MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).updateWith(numOfBytes.get());
MetricsStore.getMetric(MetricNames.PSC_FILES_PROCESSED_GAUGE).incrementBy(1L);
MetricsStore.getMetric(MetricNames.PSC_BYTES_PROCESSED_GAUGE).incrementBy(numOfBytes.get());
// Add to completed file list only if commit is successfull else it will be taken care as part of recovery
if (txnId.isPresent()) {
Transaction.Status status = writer.getTransactionStatus(txnId.get());
Expand Down Expand Up @@ -279,7 +279,7 @@ void deleteCompletedFiles() throws Exception {
*/
if (Files.deleteIfExists(filePath) || Files.notExists(Paths.get(file.fileName))) {
state.deleteCompletedFileRecord(file.fileName);
MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).updateWith(1L);
MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).incrementBy(1L);
log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath);
} else {
/**
Expand All @@ -289,7 +289,7 @@ void deleteCompletedFiles() throws Exception {
log.warn("deleteCompletedFiles: File {} doesn't exists in completed directory but still exist in default ingestion directory.", filePath);
}
} catch (Exception e) {
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).updateWith(e.getClass().getName());
MetricsStore.getMetric(MetricNames.PSC_EXCEPTIONS).incrementBy(e.getClass().getName());
log.warn("Unable to delete ingested file default name:{}, and it's completed file name:{}, error: {}.", file.fileName, filePath, e.getMessage());
log.warn("Deletion will be retried on the next iteration.");
// We can continue on this error. Deletion will be retried on the next iteration.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

import com.google.common.base.Preconditions;
Expand All @@ -13,10 +22,18 @@ public interface Metric<T> {
* value by adding the passed value.
* @param T value to add.
*/
void updateWith(T t);
void incrementBy(T t);

/**
* Clears/resets the underlying data type/structure
* stored by the Metric implementations.
*/
void clear();
}

/**
* Representation of Counter Metric.
*/
class Counter implements Metric<Long> {
private Long counter = 0L;
private String metricType = "COUNTER";
Expand All @@ -29,17 +46,20 @@ public String getMetricType() {
return metricType;
}

public void updateWith(Long value) {
public synchronized void incrementBy(Long value) {
Preconditions.checkArgument(value > 0, "value needs to be positive");
this.counter += value;
}

@Override
public void clear() {
public synchronized void clear() {
this.counter = 0L;
}
}

/**
* Representation of Gauge Metric.
*/
class Gauge implements Metric<Long> {
private Long gauge = 0L;
private String metricType = "GAUGE";
Expand All @@ -51,19 +71,22 @@ public String getMetricType() {
return metricType;
}

public void updateWith(Long value) {
public synchronized void incrementBy(Long value) {
Preconditions.checkArgument(value > 0, "value needs to be positive");
this.gauge += value;
}

@Override
public void clear() {
public synchronized void clear() {
this.gauge = 0L;
}

}


/**
* Metric class to represent a meter
* storing exception strings.
*/
class ExceptionMeter implements Metric<String> {
private final StringBuilder exceptionClass = new StringBuilder();
private final String metricType = "METER";
Expand All @@ -77,13 +100,13 @@ public String getMetricType() {
return metricType;
}

public void updateWith(String value) {
public synchronized void incrementBy(String value) {
Preconditions.checkArgument(value != null, "value needs to be non-null");
this.exceptionClass.append(value + SEPARATOR);
}

@Override
public void clear() {
public synchronized void clear() {
this.exceptionClass.setLength(0);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

import io.pravega.common.util.Property;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

public class MetricNames {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

import com.google.common.util.concurrent.AbstractService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ public void writeEvent(String routingKey, String message) {
public void close() {
this.writer.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics.writers;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics.writers;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.metrics.writers;

import com.google.common.util.concurrent.Service;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.watchdog;

import com.google.common.util.concurrent.AbstractService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public void streamWriterServiceTest() {
@Test
public void testMetricTypes() {
Gauge guage = new Gauge();
guage.updateWith(10L);
guage.incrementBy(10L);
Assert.assertEquals(guage.getGauge(), new Long(10L));
Counter counter = new Counter();
counter.updateWith(10L);
counter.incrementBy(10L);
Assert.assertEquals(counter.getCounter(), new Long(10L));
ExceptionMeter exceptionMeter = new ExceptionMeter();
exceptionMeter.updateWith("test");
exceptionMeter.incrementBy("test");
Assert.assertEquals(exceptionMeter.getExceptionClass(), "test;");
}

Expand Down
2 changes: 1 addition & 1 deletion windows-service/PscWatchdogApp.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
<env name="PSC_SERVICE_NAME" value="PravegaSensorCollector" />

<onfailure action="restart"/>
</service>
</service>

0 comments on commit 36b077c

Please sign in to comment.