Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add carbon metrics to get dashboard support #93

Merged
merged 25 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f2a266a
Add dependencies and Metrics class
sahandilshan Feb 18, 2020
e2b88a9
Add metrics to the Sink
sahandilshan Feb 18, 2020
2e606c2
Add metrics to the source
sahandilshan Feb 18, 2020
b67b489
Add Metrics to stream functions
sahandilshan Feb 18, 2020
677b13e
Add app_name label to the metrics
sahandilshan Feb 19, 2020
560b472
Merge from upstream/master
sahandilshan Feb 19, 2020
40ed6c1
Stable version
sahandilshan Mar 13, 2020
6161630
Stable version
sahandilshan Mar 26, 2020
1e52e60
Add carbon metrics and remove prometheus metrics
sahandilshan Apr 7, 2020
36e4650
Update sink and file operations
sahandilshan Apr 7, 2020
94f4a3e
Update Source to work with carbon metrics
sahandilshan Apr 7, 2020
6b41288
Fix bugs and checkstyle issues
sahandilshan Apr 15, 2020
b61a213
Add carbon analytics metrics
sahandilshan Apr 15, 2020
ea9ff8a
Merge branch 'master' of https://github.com/siddhi-io/siddhi-io-file …
sahandilshan Apr 15, 2020
705b872
Fix bugs in FileDeleteExtension
sahandilshan Apr 15, 2020
1af6e17
Add deleted api docs
sahandilshan Apr 28, 2020
192a9bb
Remove unwanted comments
sahandilshan Apr 28, 2020
d4680e3
Modify shortenFile path
sahandilshan May 5, 2020
9fc6d0d
Prevent updating the file status continuously
sahandilshan May 5, 2020
afae1be
Bump Carbon-analytics version
sahandilshan May 6, 2020
f1b4bd3
Fix issue while calculating the read percentage
sahandilshan May 6, 2020
5eb001a
Change the package of the metrics classes
sahandilshan May 27, 2020
b9047b8
Register metrics only when prometheus reporter is running
sahandilshan Jun 2, 2020
85ee5fb
Add PROMETHEUS_REPORTER_NAME constant
sahandilshan Jun 3, 2020
927d758
Merge with upstream/master
sahandilshan Jun 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>siddhi-execution-list</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics</groupId>
<artifactId>org.wso2.carbon.si.metrics.core</artifactId>
</dependency>

</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileArchiveMetrics;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
Expand All @@ -39,6 +40,7 @@
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.BufferedInputStream;
import java.io.File;
Expand Down Expand Up @@ -145,6 +147,8 @@ public class FileArchiveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileArchiveExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private String siddhiAppName;
private FileArchiveMetrics fileArchiveMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -156,6 +160,19 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(
((ConstantExpressionExecutor) attributeExpressionExecutors[3]).getValue().toString());
}
siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
"prometheus")) {
fileArchiveMetrics = new FileArchiveMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialise in "
+ inputDefinition.getId() + ".");
}
}
return null;
}

Expand Down Expand Up @@ -201,28 +218,46 @@ protected Object[] process(Object[] data) {
"Exception occurred when creating the subdirectories for the destination directory " +
destinationDirUriObject.getName().getPath(), e);
}
if (fileArchiveMetrics != null) {
fileArchiveMetrics.setSource(Utils.getShortFilePath(uri));
fileArchiveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileArchiveMetrics.setType(Utils.getShortFilePath(archiveType));
fileArchiveMetrics.setTime(System.currentTimeMillis());
}
File sourceFile = new File(uri);
String destinationFile = destinationDirUri + sourceFile.getName();

if (archiveType.compareToIgnoreCase(ZIP_FILE_EXTENSION) == 0) {
List<String> fileList = new ArrayList<>();
generateFileList(uri, sourceFile, fileList, excludeSubdirectories);
try {
zip(uri, destinationFile, fileList);

} catch (IOException e) {
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(0);
}
throw new SiddhiAppRuntimeException("IOException occurred when archiving " + uri, e);
}
} else {
try {
if (archiveType.compareToIgnoreCase(TAR_FILE_EXTENSION) == 0) {
addToTarArchiveCompression(
getTarArchiveOutputStream(destinationFile), sourceFile, uri);

} else {
throw new SiddhiAppRuntimeException("Unsupported archive type: " + archiveType);
}
} catch (IOException e) {
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when archiving " + uri, e);
}
}
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(1);
}
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileCopyMetrics;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -126,6 +128,7 @@ public class FileCopyExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private FileCopyMetrics fileCopyMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -137,6 +140,19 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
"prometheus")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make this a constant? Maybe it should be a constant in the carbon metrics package.

Copy link
Contributor Author

@sahandilshan sahandilshan Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in 85ee5fb, There's no constant available in the carbon metrics, Hence introduce as a new constant.

String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileCopyMetrics = new FileCopyMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialise in "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be rephrased as "will not be initialized"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in 85ee5fb

+ inputDefinition.getId() + ".");
}
}
return null;
}

Expand Down Expand Up @@ -222,6 +238,11 @@ public void stop() {
private void copyFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern,
FileObject rootSourceFileObject) {
FileObject destinationFileObject = null;
if (fileCopyMetrics != null) {
fileCopyMetrics.setSource(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
fileCopyMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileCopyMetrics.setTime(System.currentTimeMillis());
}
try {
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
Expand All @@ -239,7 +260,13 @@ private void copyFileToDestination(FileObject sourceFileObject, String destinati
destinationFileObject.copyFrom(sourceFileObject, Selectors.SELECT_ALL);
destinationFileObject.close();
}
if (fileCopyMetrics != null) {
fileCopyMetrics.getCopyMetric(1);
}
} catch (FileSystemException e) {
if (fileCopyMetrics != null) {
fileCopyMetrics.getCopyMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when copying for " +
"file: " + sourceFileObject.getName().getPath(), e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileDeleteMetrics;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -67,11 +69,25 @@
)
public class FileDeleteExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileDeleteExtension.class);
private FileDeleteMetrics fileDeleteMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader, boolean outputExpectsExpiredEvents,
SiddhiQueryContext siddhiQueryContext) {
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
"prometheus")) {
String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileDeleteMetrics = new FileDeleteMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialise in "
+ inputDefinition.getId() + ".");
}
}
return null;
}

Expand All @@ -93,10 +109,20 @@ protected Object[] process(Object[] data) {
@Override
protected Object[] process(Object data) {
String fileDeletePathUri = (String) data;
if (fileDeleteMetrics != null) {
fileDeleteMetrics.setSource(fileDeletePathUri);
fileDeleteMetrics.setTime(System.currentTimeMillis());
}
try {
FileObject rootFileObject = Utils.getFileObject(fileDeletePathUri);
rootFileObject.delete(Selectors.SELECT_ALL);
if (fileDeleteMetrics != null) {
fileDeleteMetrics.getDeleteMetric(1);
}
} catch (FileSystemException e) {
if (fileDeleteMetrics != null) {
fileDeleteMetrics.getDeleteMetric(0);
}
throw new SiddhiAppRuntimeException("Failure occurred when deleting the file " + fileDeletePathUri, e);
}
return new Object[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileMoveMetrics;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,6 +126,7 @@ public class FileMoveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private FileMoveMetrics fileMoveMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -135,6 +138,19 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
"prometheus")) {
String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileMoveMetrics = new FileMoveMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialise in "
+ inputDefinition.getId() + ".");
}
}
return null;
}

Expand Down Expand Up @@ -218,18 +234,31 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
FileObject destinationFileObject;

if (sourceFileObject.isFile()) {
destinationPath = destinationDirUri + File.separator + sourceFileObject.getName().getBaseName();
destinationFileObject = Utils.getFileObject(destinationPath);
FileObject destinationFolderFileObject = Utils.getFileObject(destinationDirUri);
if (!destinationFolderFileObject.exists()) {
destinationFolderFileObject.createFolder();
}
if (fileMoveMetrics != null) {
fileMoveMetrics.set_source(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
fileMoveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileMoveMetrics.setTime(System.currentTimeMillis());
}

if (pattern.matcher(fileName).lookingAt()) {
sourceFileObject.moveTo(destinationFileObject);
}
if (fileMoveMetrics != null) {
fileMoveMetrics.getMoveMetric(1);
}
}
} catch (FileSystemException e) {
if (fileMoveMetrics != null) {
fileMoveMetrics.getMoveMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " +
sourceFileObject.getName().getPath(), e);
}
Expand Down
Loading