Skip to content

Commit

Permalink
Add support for transferring raw files to NFS storage (#73)
Browse files Browse the repository at this point in the history
Adds a new service RawFileMoveService to Pravega Sensor collector to enable file transfer from a host to a mounted NFS storage.
The service uses SQL database and SQLite transaction to ensure data accuracy and avoid duplication.
  • Loading branch information
apoorva918 authored Mar 27, 2024
1 parent c21db24 commit d0c4ddc
Show file tree
Hide file tree
Showing 12 changed files with 767 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public static String createCompletedFileName(Path completedFilesDir, String file
}

/*
Move failed files to different directory
Move files to different directory
*/
static void moveFile(Path sourcePath, Path targetPath) throws IOException {
Files.createDirectories(targetPath.getParent());
Expand All @@ -185,9 +185,35 @@ static void moveFile(Path sourcePath, Path targetPath) throws IOException {
}
}
} catch (Exception e) {
LOGGER.warn("Unable to move failed file {}", e.getMessage());
LOGGER.warn("Failed file will be moved on the next iteration.");
LOGGER.warn("Unable to move file {}", e.getMessage());
LOGGER.warn("File will be moved on the next iteration.");
// We can continue on this error. Moving will be retried on the next iteration.
}
}

public static void movetoNFS(FileNameWithOffset fileEntry, Path nfsPath, String userFileSpec) throws IOException {
Path sourcePath = Paths.get(fileEntry.fileName);
LOGGER.debug("source path= {}", sourcePath);
LOGGER.debug("target path= {}", nfsPath);

Path userFileSpecPath = Paths.get(userFileSpec);
Path relativePath = userFileSpecPath.relativize(sourcePath);

String newTarget = nfsPath + File.separator + userFileSpecPath.getName(userFileSpecPath.getNameCount()-1) + File.separator + relativePath;
Path targetFile = Paths.get(newTarget);
LOGGER.info("Target path = {}", targetFile);

File f = new File(targetFile.getParent().toString().replace('\\', '/'));
if(!f.exists()) {
if(!f.mkdirs()){ //creates the directories in targetFile
LOGGER.error("Unable to create directories in target path");
throw new IOException("Unable to create directories");
}
}

Path tempFile = Paths.get(newTarget + ".temp");
Files.copy(sourcePath, tempFile, StandardCopyOption.REPLACE_EXISTING);
Files.move(tempFile, targetFile, StandardCopyOption.REPLACE_EXISTING);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ public class TransactionCoordinator {
public TransactionCoordinator(Connection connection, EventWriter<byte[]> writer) {
this.connection = connection;
this.writer = writer;
initializeDatabase();
}

public TransactionCoordinator(Connection connection) {
this.connection = connection;
this.writer = null;
initializeDatabase();
}

private void initializeDatabase() {
try {
try (final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.writetonfs;

/*
* Configuration file.
*/
public class FileConfig {
public final String stateDatabaseFileName;
public final String fileSpec;
public final String fileExtension;
public final String nfsMountPath;
public final String routingKey;
public final String eventTemplateStr;
public final String fileType;
/**
* Also known as samplesPerEvent.
*/
public final int maxRecordsPerEvent;

public final boolean enableDeleteCompletedFiles;
public final boolean exactlyOnce;
public final double transactionTimeoutMinutes;

public final long minTimeInMillisToUpdateFile;

public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExtension, String nfsMountPath,String routingKey, String eventTemplateStr, int maxRecordsPerEvent, boolean enableDeleteCompletedFiles, boolean exactlyOnce, double transactionTimeoutMinutes, long minTimeInMillisToUpdateFile, String fileType) {
this.stateDatabaseFileName = stateDatabaseFileName;
this.fileSpec = fileSpec;
this.fileExtension = fileExtension;
this.nfsMountPath = nfsMountPath;
this.routingKey = routingKey;
this.eventTemplateStr = eventTemplateStr;
this.maxRecordsPerEvent = maxRecordsPerEvent;
this.enableDeleteCompletedFiles = enableDeleteCompletedFiles;
this.exactlyOnce = exactlyOnce;
this.transactionTimeoutMinutes = transactionTimeoutMinutes;
this.minTimeInMillisToUpdateFile = minTimeInMillisToUpdateFile;
this.fileType = fileType;
}

@Override
public String toString() {
return "FileConfig{"
+ "stateDatabaseFileName='" + stateDatabaseFileName + '\''
+ ", fileSpec='" + fileSpec + '\''
+ ", fileExtension='" + fileExtension + '\''
+ ", nfsMountPath='" + nfsMountPath + '\''
+ ", fileType='" + fileType + '\''
+ ", routingKey='" + routingKey + '\''
+ ", eventTemplateStr='" + eventTemplateStr + '\''
+ ", maxRecordsPerEvent=" + maxRecordsPerEvent
+ ", enableDeleteCompletedFiles=" + enableDeleteCompletedFiles
+ ", exactlyOnce=" + exactlyOnce
+ ", transactionTimeoutMinutes=" + transactionTimeoutMinutes
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.writetonfs;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.pravega.sensor.collector.DeviceDriver;
import io.pravega.sensor.collector.DeviceDriverConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* File transfer service with common implementation logic for all files.
*/
public abstract class FileMoveService extends DeviceDriver {
private static final Logger LOG = LoggerFactory.getLogger(FileMoveService.class);

private static final String FILE_SPEC_KEY = "FILE_SPEC";
private static final String FILE_EXT = "FILE_EXTENSION";
private static final String NFS_MOUNT_PATH = "NFS_MOUNT_PATH";
private static final String DELETE_COMPLETED_FILES_KEY = "DELETE_COMPLETED_FILES";
private static final String DATABASE_FILE_KEY = "DATABASE_FILE";
private static final String EVENT_TEMPLATE_KEY = "EVENT_TEMPLATE";
private static final String SAMPLES_PER_EVENT_KEY = "SAMPLES_PER_EVENT";
private static final String INTERVAL_MS_KEY = "INTERVAL_MS";

private static final String ROUTING_KEY_KEY = "ROUTING_KEY";
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";

private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;

private static final int DEFAULT_INTERVAL_MS_KEY = 10000;

private final FileProcessor processor;
private final ScheduledExecutorService executor;

private ScheduledFuture<?> watchFiletask;
private ScheduledFuture<?> processFileTask;

public FileMoveService(DeviceDriverConfig config) {
super(config);
final FileConfig fileSequenceConfig = new FileConfig(
getDatabaseFileName(),
getFileSpec(),
getFileExtension(),
getNFSMountPath(),
getRoutingKey(),
getEventTemplate(),
getSamplesPerEvent(),
getDeleteCompletedFiles(),
getExactlyOnce(),
getTransactionTimeoutMinutes(),
getMinTimeInMillisToUpdateFile(),
config.getClassName());
LOG.info("File Transfer Config: {}", fileSequenceConfig);
processor = FileProcessor.create(fileSequenceConfig);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(
FileMoveService.class.getSimpleName() + "-" + config.getInstanceName() + "-%d").build();
executor = Executors.newScheduledThreadPool(1, namedThreadFactory);
}

String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}

String getFileExtension() {
return getProperty(FILE_EXT, "");
}

String getNFSMountPath() {
return getProperty(NFS_MOUNT_PATH, "");
}

boolean getDeleteCompletedFiles() {
return Boolean.parseBoolean(getProperty(DELETE_COMPLETED_FILES_KEY, Boolean.toString(true)));
}

String getDatabaseFileName() {
return getProperty(DATABASE_FILE_KEY);
}

String getEventTemplate() {
return getProperty(EVENT_TEMPLATE_KEY, "{}");
}

int getSamplesPerEvent() {
return Integer.parseInt(getProperty(SAMPLES_PER_EVENT_KEY, Integer.toString(DEFAULT_SAMPLES_PER_EVENT_KEY)));
}

long getIntervalMs() {
return Long.parseLong(getProperty(INTERVAL_MS_KEY, Long.toString(DEFAULT_INTERVAL_MS_KEY)));
}

protected String getRoutingKey() {
return getProperty(ROUTING_KEY_KEY, "");
}

boolean getExactlyOnce() {
return Boolean.parseBoolean(getProperty(EXACTLY_ONCE_KEY, Boolean.toString(true)));
}

/**
* This time duration must not exceed the controller property controller.transaction.maxLeaseValue (milliseconds).
*/
double getTransactionTimeoutMinutes() {
return Double.parseDouble(getProperty(TRANSACTION_TIMEOUT_MINUTES_KEY, Double.toString(18.0 * 60.0)));
}

long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

protected void watchFiles() {
LOG.trace("watchFiles: BEGIN");
try {
processor.watchFiles();
} catch (Exception e) {
LOG.error("watchFiles: watch file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("watchFiles: END");
}

protected void processFiles() {
LOG.trace("processFiles: BEGIN");
try {
processor.processFiles();
} catch (Exception e) {
LOG.error("processFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("processFiles: END");
}

@Override
protected void doStart() {
watchFiletask = executor.scheduleAtFixedRate(
this::watchFiles,
0,
getIntervalMs(),
TimeUnit.MILLISECONDS);
/*
Submits a periodic action that becomes enabled immediately for the first time,
and subsequently with the delay of 1 milliseconds between the termination of one execution and the commencement of the next
ie immediately after completion of first action.
*/
processFileTask = executor.scheduleWithFixedDelay(
this::processFiles,
0,
1,
TimeUnit.MILLISECONDS);
notifyStarted();
}

@Override
protected void doStop() {
LOG.info("doStop: Cancelling transfer task and process file task");
watchFiletask.cancel(false);
processFileTask.cancel(false);
}
}
Loading

0 comments on commit d0c4ddc

Please sign in to comment.