Skip to content

Commit

Permalink
Merge pull request #93 from sahandilshan/develop
Browse files Browse the repository at this point in the history
Add carbon metrics to get dashboard support
  • Loading branch information
ramindu90 authored Jun 4, 2020
2 parents f9f8b79 + 927d758 commit 472476e
Show file tree
Hide file tree
Showing 22 changed files with 1,187 additions and 17 deletions.
5 changes: 5 additions & 0 deletions component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@
<artifactId>siddhi-execution-list</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.analytics</groupId>
<artifactId>org.wso2.carbon.si.metrics.core</artifactId>
</dependency>

</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileArchiveMetrics;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
Expand All @@ -39,6 +41,7 @@
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.BufferedInputStream;
import java.io.File;
Expand Down Expand Up @@ -145,6 +148,8 @@ public class FileArchiveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileArchiveExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private String siddhiAppName;
private FileArchiveMetrics fileArchiveMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -156,6 +161,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(
((ConstantExpressionExecutor) attributeExpressionExecutors[3]).getValue().toString());
}
siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
Constants.PROMETHEUS_REPORTER_NAME)) {
fileArchiveMetrics = new FileArchiveMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
}
}
return null;
}

Expand Down Expand Up @@ -201,28 +218,46 @@ protected Object[] process(Object[] data) {
"Exception occurred when creating the subdirectories for the destination directory " +
destinationDirUriObject.getName().getPath(), e);
}
if (fileArchiveMetrics != null) {
fileArchiveMetrics.setSource(Utils.getShortFilePath(uri));
fileArchiveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileArchiveMetrics.setType(Utils.getShortFilePath(archiveType));
fileArchiveMetrics.setTime(System.currentTimeMillis());
}
File sourceFile = new File(uri);
String destinationFile = destinationDirUri + sourceFile.getName();

if (archiveType.compareToIgnoreCase(ZIP_FILE_EXTENSION) == 0) {
List<String> fileList = new ArrayList<>();
generateFileList(uri, sourceFile, fileList, excludeSubdirectories);
try {
zip(uri, destinationFile, fileList);

} catch (IOException e) {
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(0);
}
throw new SiddhiAppRuntimeException("IOException occurred when archiving " + uri, e);
}
} else {
try {
if (archiveType.compareToIgnoreCase(TAR_FILE_EXTENSION) == 0) {
addToTarArchiveCompression(
getTarArchiveOutputStream(destinationFile), sourceFile, uri);

} else {
throw new SiddhiAppRuntimeException("Unsupported archive type: " + archiveType);
}
} catch (IOException e) {
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when archiving " + uri, e);
}
}
if (fileArchiveMetrics != null) {
fileArchiveMetrics.getArchiveMetric(1);
}
return new Object[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileCopyMetrics;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -126,6 +129,7 @@ public class FileCopyExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private FileCopyMetrics fileCopyMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -137,6 +141,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
Constants.PROMETHEUS_REPORTER_NAME)) {
String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileCopyMetrics = new FileCopyMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
}
}
return null;
}

Expand Down Expand Up @@ -222,6 +238,11 @@ public void stop() {
private void copyFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern,
FileObject rootSourceFileObject) {
FileObject destinationFileObject = null;
if (fileCopyMetrics != null) {
fileCopyMetrics.setSource(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
fileCopyMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileCopyMetrics.setTime(System.currentTimeMillis());
}
try {
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
Expand All @@ -239,7 +260,13 @@ private void copyFileToDestination(FileObject sourceFileObject, String destinati
destinationFileObject.copyFrom(sourceFileObject, Selectors.SELECT_ALL);
destinationFileObject.close();
}
if (fileCopyMetrics != null) {
fileCopyMetrics.getCopyMetric(1);
}
} catch (FileSystemException e) {
if (fileCopyMetrics != null) {
fileCopyMetrics.getCopyMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when copying for " +
"file: " + sourceFileObject.getName().getPath(), e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileDeleteMetrics;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.Selectors;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -67,11 +70,24 @@
)
public class FileDeleteExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileDeleteExtension.class);
private FileDeleteMetrics fileDeleteMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
ConfigReader configReader, boolean outputExpectsExpiredEvents,
SiddhiQueryContext siddhiQueryContext) {
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
Constants.PROMETHEUS_REPORTER_NAME)) {
String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileDeleteMetrics = new FileDeleteMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
}
}
return null;
}

Expand All @@ -93,10 +109,20 @@ protected Object[] process(Object[] data) {
@Override
protected Object[] process(Object data) {
String fileDeletePathUri = (String) data;
if (fileDeleteMetrics != null) {
fileDeleteMetrics.setSource(fileDeletePathUri);
fileDeleteMetrics.setTime(System.currentTimeMillis());
}
try {
FileObject rootFileObject = Utils.getFileObject(fileDeletePathUri);
rootFileObject.delete(Selectors.SELECT_ALL);
if (fileDeleteMetrics != null) {
fileDeleteMetrics.getDeleteMetric(1);
}
} catch (FileSystemException e) {
if (fileDeleteMetrics != null) {
fileDeleteMetrics.getDeleteMetric(0);
}
throw new SiddhiAppRuntimeException("Failure occurred when deleting the file " + fileDeletePathUri, e);
}
return new Object[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@
import io.siddhi.core.query.processor.stream.function.StreamFunctionProcessor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.extension.io.file.metrics.FileMoveMetrics;
import io.siddhi.extension.io.file.util.Constants;
import io.siddhi.extension.util.Utils;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
import org.apache.log4j.Logger;
import org.wso2.carbon.si.metrics.core.internal.MetricsDataHolder;

import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,6 +127,7 @@ public class FileMoveExtension extends StreamFunctionProcessor {
private static final Logger log = Logger.getLogger(FileCopyExtension.class);
private Pattern pattern = null;
private int inputExecutorLength;
private FileMoveMetrics fileMoveMetrics;

@Override
protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecutor[] attributeExpressionExecutors,
Expand All @@ -135,6 +139,18 @@ protected StateFactory init(AbstractDefinition inputDefinition, ExpressionExecut
pattern = Pattern.compile(((ConstantExpressionExecutor)
attributeExpressionExecutors[2]).getValue().toString());
}
if (MetricsDataHolder.getInstance().getMetricService() != null &&
MetricsDataHolder.getInstance().getMetricManagementService().isEnabled()) {
try {
if (MetricsDataHolder.getInstance().getMetricManagementService().isReporterRunning(
Constants.PROMETHEUS_REPORTER_NAME)) {
String siddhiAppName = siddhiQueryContext.getSiddhiAppContext().getName();
fileMoveMetrics = new FileMoveMetrics(siddhiAppName);
}
} catch (IllegalArgumentException e) {
log.debug("Prometheus reporter is not running. Hence file metrics will not be initialized.");
}
}
return null;
}

Expand Down Expand Up @@ -218,18 +234,31 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati
String fileName = sourceFileObject.getName().getBaseName();
String destinationPath;
FileObject destinationFileObject;

if (sourceFileObject.isFile()) {
destinationPath = destinationDirUri + File.separator + sourceFileObject.getName().getBaseName();
destinationFileObject = Utils.getFileObject(destinationPath);
FileObject destinationFolderFileObject = Utils.getFileObject(destinationDirUri);
if (!destinationFolderFileObject.exists()) {
destinationFolderFileObject.createFolder();
}
if (fileMoveMetrics != null) {
fileMoveMetrics.set_source(Utils.getShortFilePath(sourceFileObject.getName().getPath()));
fileMoveMetrics.setDestination(Utils.getShortFilePath(destinationDirUri));
fileMoveMetrics.setTime(System.currentTimeMillis());
}

if (pattern.matcher(fileName).lookingAt()) {
sourceFileObject.moveTo(destinationFileObject);
}
if (fileMoveMetrics != null) {
fileMoveMetrics.getMoveMetric(1);
}
}
} catch (FileSystemException e) {
if (fileMoveMetrics != null) {
fileMoveMetrics.getMoveMetric(0);
}
throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " +
sourceFileObject.getName().getPath(), e);
}
Expand Down
Loading

0 comments on commit 472476e

Please sign in to comment.