Skip to content

Commit

Permalink
Added separate class FileUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorva918 committed Oct 13, 2023
1 parent 6d426dc commit 8031bc3
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
import io.pravega.sensor.collector.util.PersistentId;
import io.pravega.sensor.collector.util.TransactionCoordinator;

Expand Down Expand Up @@ -128,43 +130,11 @@ protected void findAndRecordNewFiles() throws Exception {
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.trace("getDirectoryListing: fileSpec={}", config.fileSpec);
final List<FileNameWithOffset> directoryListing = getDirectoryListing(config.fileSpec, config.fileExtension);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension);
log.trace("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}

/**
* @return list of file name and file size in bytes
*/
static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
String[] directories= fileSpec.split(",");
List<FileNameWithOffset> directoryListing = new ArrayList<>();
for (String directory : directories) {
final Path pathSpec = Paths.get(directory);
getDirectoryFiles(pathSpec, fileExtension,directoryListing);
}
return directoryListing;
}

/**
* @return get all files in directory(including subdirectories) and their respective file size in bytes
*/
static protected void getDirectoryFiles(Path dirPath, String fileExtension, List<FileNameWithOffset> directoryListing) throws IOException{
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(dirPath)){
for(Path path: dirStream){
if(Files.isDirectory(path))
getDirectoryFiles(path, fileExtension, directoryListing);
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
// If extension is null, ingest all files
if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".")+1)))
directoryListing.add(fileEntry);
}
}
}
return;
}

/**
* @return sorted list of file name and file size in bytes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.parquet.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.AutoRollback;
import io.pravega.sensor.collector.util.TransactionCoordinator;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.pravega.client.stream.TxnFailedException;
import io.pravega.client.stream.impl.ByteArraySerializer;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
import io.pravega.sensor.collector.util.PersistentId;
import io.pravega.sensor.collector.util.TransactionCoordinator;

Expand Down Expand Up @@ -127,43 +129,11 @@ protected void findAndRecordNewFiles() throws Exception {
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.trace("getDirectoryListing: fileSpec={}", config.fileSpec);
final List<FileNameWithOffset> directoryListing = getDirectoryListing(config.fileSpec, config.fileExtension);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension);
log.trace("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}

/**
* @return list of file name and file size in bytes
*/
static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
String[] directories= fileSpec.split(",");
List<FileNameWithOffset> directoryListing = new ArrayList<>();
for (String directory : directories) {
final Path pathSpec = Paths.get(directory);
getDirectoryFiles(pathSpec, fileExtension,directoryListing);
}
return directoryListing;
}

/**
* @return get all files in directory(including subdirectories) and their respective file size in bytes
*/
static protected void getDirectoryFiles(Path dirPath, String fileExtension, List<FileNameWithOffset> directoryListing) throws IOException{
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(dirPath)){
for(Path path: dirStream){
if(Files.isDirectory(path))
getDirectoryFiles(path, fileExtension, directoryListing);
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
// If extension is null, ingest all files
if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".")+1)))
directoryListing.add(fileEntry);
}
}
}
return;
}

/**
* @return sorted list of file name and file size in bytes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.util.AutoRollback;
import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.TransactionCoordinator;

import static java.sql.Connection.TRANSACTION_SERIALIZABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.parquet;
package io.pravega.sensor.collector.util;

import java.util.Objects;

/**
* File name and file size
*/
public class FileNameWithOffset implements Comparable<FileNameWithOffset> {
public final String fileName;
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.pravega.sensor.collector.util;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.ArrayList;

public class FileUtils {

final static String separator = ",";

/**
* @return list of file name and file size in bytes
*/
static public List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
String[] directories= fileSpec.split(separator);
List<FileNameWithOffset> directoryListing = new ArrayList<>();
for (String directory : directories) {
final Path pathSpec = Paths.get(directory);
getDirectoryFiles(pathSpec, fileExtension, directoryListing);
}
return directoryListing;
}

/**
* @return get all files in directory(including subdirectories) and their respective file size in bytes
*/
static protected void getDirectoryFiles(Path dirPath, String fileExtension, List<FileNameWithOffset> directoryListing) throws IOException{
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(dirPath)){
for(Path path: dirStream){
if(Files.isDirectory(path))
getDirectoryFiles(path, fileExtension, directoryListing);
else {
FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length());
// If extension is null, ingest all files
if(fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".")+1)))
directoryListing.add(fileEntry);
}
}
}
return;
}

}

0 comments on commit 8031bc3

Please sign in to comment.