diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/parquet/ParquetFileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/parquet/ParquetFileProcessor.java index 335c3d5b..fa9a68c6 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/parquet/ParquetFileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/parquet/ParquetFileProcessor.java @@ -12,10 +12,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.sql.Connection; import java.util.ArrayList; import java.util.Collections; @@ -92,11 +95,12 @@ public static ParquetFileProcessor create(ParquetFileConfig config, EventStreamC public void ingestParquetFiles() throws Exception { log.trace("ingestParquetFiles: BEGIN"); - findAndRecordNewFiles(); - processNewFiles(); + // delete leftover completed files if (config.enableDeleteCompletedFiles) { deleteCompletedFiles(); } + findAndRecordNewFiles(); + processNewFiles(); log.trace("ingestParquetFiles: END"); } @@ -185,33 +189,35 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN writer.abort(); try (final InputStream inputStream = new FileInputStream(fileNameWithBeginOffset.fileName)) { - final CountingInputStream countingInputStream = new CountingInputStream(inputStream); - countingInputStream.skip(fileNameWithBeginOffset.offset); - final Pair result = eventGenerator.generateEventsFromInputStream(countingInputStream, firstSequenceNumber, - e -> { - log.trace("processFile: event={}", e); - try { - writer.writeEvent(e.routingKey, e.bytes); - numofbytes.addAndGet(e.bytes.length); - } catch (TxnFailedException ex) { - throw new RuntimeException(ex); - } - }); - final Optional txnId = writer.flush(); - final long nextSequenceNumber = result.getLeft(); - final long endOffset = result.getRight(); - state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); - // injectCommitFailure(); - writer.commit(); - state.deleteTransactionToCommit(txnId); + try(final CountingInputStream countingInputStream = new CountingInputStream(inputStream)) { + countingInputStream.skip(fileNameWithBeginOffset.offset); + final Pair result = eventGenerator.generateEventsFromInputStream(countingInputStream, firstSequenceNumber, + e -> { + log.trace("processFile: event={}", e); + try { + writer.writeEvent(e.routingKey, e.bytes); + numofbytes.addAndGet(e.bytes.length); + + } catch (TxnFailedException ex) { + throw new RuntimeException(ex); + } + }); + final Optional txnId = writer.flush(); + final long nextSequenceNumber = result.getLeft(); + final long endOffset = result.getRight(); + state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); + // injectCommitFailure(); + writer.commit(); + state.deleteTransactionToCommit(txnId); - double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0; - double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0; - double megabytesPerSec = megabyteCount / elapsedSec; - log.info("processFile: Finished ingesting file {}; endOffset={}, nextSequenceNumber={}", - fileNameWithBeginOffset.fileName, endOffset, nextSequenceNumber); - log.info("Sent {} MB in {} sec", megabyteCount, elapsedSec ); - log.info("Transfer rate: {} MB/sec", megabytesPerSec); + double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0; + double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0; + double megabytesPerSec = megabyteCount / elapsedSec; + log.info("processFile: Finished ingesting file {}; endOffset={}, nextSequenceNumber={}", + fileNameWithBeginOffset.fileName, endOffset, nextSequenceNumber); + log.info("Sent {} MB in {} sec", megabyteCount, elapsedSec ); + log.info("Transfer rate: {} MB/sec", megabytesPerSec); + } } // Delete file right after ingesting @@ -224,14 +230,25 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN void deleteCompletedFiles() throws Exception { final List completedFiles = state.getCompletedFiles(); completedFiles.forEach(file -> { - try { - Files.deleteIfExists(Paths.get(file.fileName)); - log.info("deleteCompletedFiles: Deleted file {}", file.fileName); - // Only remove from database if we could delete file. - state.deleteCompletedFile(file.fileName); + //Obtain a lock on file + try(FileChannel channel = FileChannel.open(Paths.get(file.fileName),StandardOpenOption.WRITE)){ + try(FileLock lock = channel.tryLock()) { + if(lock!=null){ + Files.deleteIfExists(Paths.get(file.fileName)); + log.info("deleteCompletedFiles: Deleted file {}", file.fileName); + lock.release(); + // Only remove from database if we could delete file. + state.deleteCompletedFile(file.fileName); + } + else{ + log.warn("Unable to obtain lock on file {}. File is locked by another process.", file.fileName); + throw new Exception(); + } + } } catch (Exception e) { - log.warn("Unable to delete ingested file {}", e); - // We can continue on this error. It will be retried on the next iteration. + log.warn("Unable to delete ingested file {}", e.getMessage()); + log.warn("Deletion will be retried on the next iteration."); + // We can continue on this error. Deletion will be retried on the next iteration. } }); } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/EventGenerator.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/EventGenerator.java index 4100b08d..52882e18 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/EventGenerator.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/EventGenerator.java @@ -75,6 +75,6 @@ protected Pair generateEventsFromInputStream(CountingInputStream inp } catch (Exception e){ log.error("Exception = {}",e); throw e; - } + } } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/RawFileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/RawFileProcessor.java index 958a53e0..e420493d 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/RawFileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/rawfile/RawFileProcessor.java @@ -12,10 +12,13 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.sql.Connection; import java.util.ArrayList; import java.util.Collections; @@ -91,11 +94,12 @@ public static RawFileProcessor create(RawFileConfig config, EventStreamClientFac public void ingestRawFiles() throws Exception { log.trace("ingestRawFiles: BEGIN"); - findAndRecordNewFiles(); - processNewFiles(); + // delete leftover completed files if (config.enableDeleteCompletedFiles) { deleteCompletedFiles(); } + findAndRecordNewFiles(); + processNewFiles(); log.trace("ingestRawFiles: END"); } @@ -182,34 +186,35 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN writer.abort(); try (final InputStream inputStream = new FileInputStream(fileNameWithBeginOffset.fileName)) { - final CountingInputStream countingInputStream = new CountingInputStream(inputStream); - countingInputStream.skip(fileNameWithBeginOffset.offset); - final Pair result = eventGenerator.generateEventsFromInputStream(countingInputStream, firstSequenceNumber, - e -> { - log.trace("processFile: event={}", e); - try { - writer.writeEvent(e.routingKey, e.bytes); - numofbytes.addAndGet(e.bytes.length); - - } catch (TxnFailedException ex) { - throw new RuntimeException(ex); - } - }); - final Optional txnId = writer.flush(); - final long nextSequenceNumber = result.getLeft(); - final long endOffset = result.getRight(); - state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); - // injectCommitFailure(); - writer.commit(); - state.deleteTransactionToCommit(txnId); + try(final CountingInputStream countingInputStream = new CountingInputStream(inputStream)) { + countingInputStream.skip(fileNameWithBeginOffset.offset); + final Pair result = eventGenerator.generateEventsFromInputStream(countingInputStream, firstSequenceNumber, + e -> { + log.trace("processFile: event={}", e); + try { + writer.writeEvent(e.routingKey, e.bytes); + numofbytes.addAndGet(e.bytes.length); + + } catch (TxnFailedException ex) { + throw new RuntimeException(ex); + } + }); + final Optional txnId = writer.flush(); + final long nextSequenceNumber = result.getLeft(); + final long endOffset = result.getRight(); + state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId); + // injectCommitFailure(); + writer.commit(); + state.deleteTransactionToCommit(txnId); - double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0; - double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0; - double megabytesPerSec = megabyteCount / elapsedSec; - log.info("processFile: Finished ingesting file {}; endOffset={}, nextSequenceNumber={}", - fileNameWithBeginOffset.fileName, endOffset, nextSequenceNumber); - log.info("Sent {} MB in {} sec", megabyteCount, elapsedSec ); - log.info("Transfer rate: {} MB/sec", megabytesPerSec); + double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0; + double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0; + double megabytesPerSec = megabyteCount / elapsedSec; + log.info("processFile: Finished ingesting file {}; endOffset={}, nextSequenceNumber={}", + fileNameWithBeginOffset.fileName, endOffset, nextSequenceNumber); + log.info("Sent {} MB in {} sec", megabyteCount, elapsedSec ); + log.info("Transfer rate: {} MB/sec", megabytesPerSec); + } } // Delete file right after ingesting @@ -221,14 +226,25 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN void deleteCompletedFiles() throws Exception { final List completedFiles = state.getCompletedFiles(); completedFiles.forEach(file -> { - try { - Files.deleteIfExists(Paths.get(file.fileName)); - log.info("deleteCompletedFiles: Deleted file {}", file.fileName); - // Only remove from database if we could delete file. - state.deleteCompletedFile(file.fileName); + //Obtain a lock on file + try(FileChannel channel = FileChannel.open(Paths.get(file.fileName),StandardOpenOption.WRITE)){ + try(FileLock lock = channel.tryLock()) { + if(lock!=null){ + Files.deleteIfExists(Paths.get(file.fileName)); + log.info("deleteCompletedFiles: Deleted file {}", file.fileName); + lock.release(); + // Only remove from database if we could delete file. + state.deleteCompletedFile(file.fileName); + } + else{ + log.warn("Unable to obtain lock on file {}. File is locked by another process.", file.fileName); + throw new Exception(); + } + } } catch (Exception e) { - log.warn("Unable to delete ingested file {}", e); - // We can continue on this error. It will be retried on the next iteration. + log.warn("Unable to delete ingested file {}", e.getMessage()); + log.warn("Deletion will be retried on the next iteration."); + // We can continue on this error. Deletion will be retried on the next iteration. } }); } diff --git a/scripts/env.sh b/scripts/env.sh index 6005809e..e53cb974 100755 --- a/scripts/env.sh +++ b/scripts/env.sh @@ -11,5 +11,5 @@ export ENV_LOCAL_SCRIPT=$(dirname $0)/env-local.sh if [[ -f ${ENV_LOCAL_SCRIPT} ]]; then source ${ENV_LOCAL_SCRIPT} fi -export APP_VERSION=${APP_VERSION:-0.2.16} +export APP_VERSION=${APP_VERSION:-0.2.17} export GRADLE_OPTIONS="${GRADLE_OPTIONS:-"-Pversion=${APP_VERSION}"}"