Skip to content

Commit

Permalink
Support ingestion from multiple directories (#16)
Browse files Browse the repository at this point in the history
Support ingestion from multiple directories
  • Loading branch information
apoorva918 authored Oct 27, 2023
1 parent 5f0b116 commit d7aa4b7
Show file tree
Hide file tree
Showing 35 changed files with 509 additions and 370 deletions.
Binary file added parquet-file-sample-data/sub1.parquet
Binary file not shown.
Binary file added parquet-file-sample-data/sub2.parquet
Binary file not shown.
Binary file added parquet-file-sample-data/sub3.parquet
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.util.PravegaWriterEvent;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;

/**
* Generate Event from file
*/
public class EventGenerator {
private static final Logger log = LoggerFactory.getLogger(EventGenerator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* Ingestion service for csv files.
*/
public class LogFileIngestService extends DeviceDriver {
private static final Logger log = LoggerFactory.getLogger(LogFileIngestService.class);

Expand Down Expand Up @@ -155,7 +158,7 @@ protected void doStart() {
processFileTask = executor.scheduleWithFixedDelay(
this::processLogFiles,
0,
0,
1,
TimeUnit.MILLISECONDS);
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
package io.pravega.sensor.collector.file;

import com.google.common.io.CountingInputStream;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileUtils;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.PersistentId;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -34,11 +33,11 @@
import java.sql.Connection;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Get list of files obtained from config. Process each file for ingestion.
* Keep track of new files and delete ingested files if "DELETE_COMPLETED_FILES"=true.
*/
public class LogFileSequenceProcessor {
private static final Logger log = LoggerFactory.getLogger(LogFileSequenceProcessorState.class);

Expand Down Expand Up @@ -125,49 +124,11 @@ protected void findAndRecordNewFiles() throws Exception {
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.info("getDirectoryListing: fileSpec={}", config.fileSpec);
final List<FileNameWithOffset> directoryListing = getDirectoryListing(config.fileSpec, config.fileExtension);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension);
log.trace("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}

/**
* @return list of file name and file size in bytes
* Handle the below cases
* 1. If given file path does not exist then log the message and continue
* 2. If directory does not exist and no file with given extn like .csv then log the message and continue
* 3. check for empty file, log the message and continue with valid files
*
*/
static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
final Path pathSpec = Paths.get(fileSpec);
if (!Files.isDirectory(pathSpec.toAbsolutePath())) {
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}
List<FileNameWithOffset> directoryListing = new ArrayList<>();
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(pathSpec)) {
for (Path path : dirStream) {
if (Files.isDirectory(path)) //traverse subdirectories
directoryListing.addAll(getDirectoryListing(path.toString(), fileExtension));
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
if (isValidFile(fileEntry, fileExtension)) {
directoryListing.add(fileEntry);
}
}
}
}catch(Exception ex){
if(ex instanceof IOException){
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}else{
log.error("getDirectoryListing: Exception while listing files: {}", pathSpec.toAbsolutePath());
throw new IOException(ex);
}
}
return directoryListing;
}

/**
* @return sorted list of file name and file size in bytes
*/
Expand Down Expand Up @@ -280,25 +241,6 @@ void deleteCompletedFiles() throws Exception {
});
}

/*
Check for below file validation
1. Is File empty
2. If extension is null or extension is valid ingest all file
*/
public static boolean isValidFile(FileNameWithOffset fileEntry, String fileExtension ){

if(fileEntry.offset<=0){
log.warn("isValidFile: Empty file {} can not be processed ",fileEntry.fileName);
}
// If extension is null, ingest all files
else if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".")+1)))
return true;
else
log.warn("isValidFile: File format {} is not supported ", fileEntry.fileName);

return false;
}

/**
* Inject a failure before commit for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.google.common.annotations.VisibleForTesting;
import io.pravega.sensor.collector.util.AutoRollback;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -30,6 +31,9 @@

import static java.sql.Connection.TRANSACTION_SERIALIZABLE;

/**
* Maintain state of pending and completed files in SQLite database.
*/
public class LogFileSequenceProcessorState implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(LogFileSequenceProcessorState.class);

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.util.PravegaWriterEvent;

/**
* Generate Event from file
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void doStart() {
processFileTask = executor.scheduleWithFixedDelay(
this::processParquetFiles,
0,
0,
1,
TimeUnit.MILLISECONDS);
notifyStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.sql.Connection;
Expand All @@ -28,8 +26,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.io.CountingInputStream;

Expand All @@ -43,6 +39,8 @@
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
import io.pravega.sensor.collector.util.PersistentId;
import io.pravega.sensor.collector.util.TransactionCoordinator;

Expand Down Expand Up @@ -135,44 +133,11 @@ protected void findAndRecordNewFiles() throws Exception {
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.trace("getDirectoryListing: fileSpec={}", config.fileSpec);
final List<FileNameWithOffset> directoryListing = getDirectoryListing(config.fileSpec, config.fileExtension);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension);
log.trace("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}

/**
* @return list of file name and file size in bytes
*/
static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
final Path pathSpec = Paths.get(fileSpec);
if (!Files.isDirectory(pathSpec.toAbsolutePath())) {
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}
List<FileNameWithOffset> directoryListing = new ArrayList<>();
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(pathSpec)){
for(Path path: dirStream){
if(Files.isDirectory(path)) //traverse subdirectories
directoryListing.addAll(getDirectoryListing(path.toString(), fileExtension));
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
if(isValidFile(fileEntry, fileExtension)){
directoryListing.add(fileEntry);
}
}
}
}catch(Exception ex){
if(ex instanceof IOException){
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}else{
log.error("getDirectoryListing: Exception while listing files: {}", pathSpec.toAbsolutePath());
throw new IOException(ex);
}
}
return directoryListing;
}

/**
* @return sorted list of file name and file size in bytes
*/
Expand Down Expand Up @@ -288,25 +253,6 @@ void deleteCompletedFiles() throws Exception {
});
}

/*
Check for below file validation
1. Is File empty
2. If extension is null or extension is valid ingest all file
*/
public static boolean isValidFile(FileNameWithOffset fileEntry, String fileExtension ){

if(fileEntry.offset<=0){
log.warn("isValidFile: Empty file {} can not be processed ",fileEntry.fileName);
}
// If extension is null, ingest all files
else if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".")+1)))
return true;
else
log.warn("isValidFile: File format {} is not supported ", fileEntry.fileName);

return false;
}

/**
* Inject a failure before commit for testing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.parquet.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.AutoRollback;
import io.pravega.sensor.collector.util.TransactionCoordinator;

Expand Down
Loading

0 comments on commit d7aa4b7

Please sign in to comment.