Skip to content

Commit

Permalink
adding more logging
Browse files Browse the repository at this point in the history
Signed-off-by: Shwetha N <[email protected]>
  • Loading branch information
ShwethaSNayak committed Oct 11, 2023
1 parent ba8824d commit b8d1a2f
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class LogFileIngestService extends DeviceDriver {
private final LogFileSequenceProcessor processor;
private final ScheduledExecutorService executor;

private ScheduledFuture<?> task;
private ScheduledFuture<?> ingestFiletask;
private ScheduledFuture<?> processFileTask;

public LogFileIngestService(DeviceDriverConfig config) {
Expand Down Expand Up @@ -130,19 +130,19 @@ protected void ingestLogFiles() {
log.info("ingestLogFiles: END");
}
protected void processLogFiles() {
log.info("processLogFiles: BEGIN");
log.trace("processLogFiles: BEGIN");
try {
processor.processLogFiles();
} catch (Exception e) {
log.error("processLogFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
log.info("processLogFiles: END");
log.trace("processLogFiles: END");
}

@Override
protected void doStart() {
task = executor.scheduleAtFixedRate(
ingestFiletask = executor.scheduleAtFixedRate(
this::ingestLogFiles,
0,
getIntervalMs(),
Expand All @@ -162,7 +162,8 @@ protected void doStart() {

@Override
protected void doStop() {
task.cancel(false);
log.info("doStop: Cancelling ingestion task and process file task");
ingestFiletask.cancel(false);
processFileTask.cancel(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, S
}
}
}
}catch(Exception ex){
if(ex instanceof IOException){
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}else{
log.error("getDirectoryListing: Exception while listing files: {}", pathSpec.toAbsolutePath());
throw new IOException(ex);
}
}
return directoryListing;
}
Expand Down Expand Up @@ -221,17 +229,24 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
final Optional<UUID> txnId = writer.flush();
final long nextSequenceNumber = result.getLeft();
final long endOffset = result.getRight();
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
// injectCommitFailure();
try {
// commit fails only if Transaction is not in open state.
log.info("processFile: Commit transaction for Id: {}; file: {}", txnId.orElse(null), fileNameWithBeginOffset.fileName);
writer.commit();
} catch (TxnFailedException ex) {
log.error("processFile: Commit transaction for id: {}, file : {}, failed with exception: {}", txnId, fileNameWithBeginOffset.fileName, ex);
throw new RuntimeException(ex);
}
// Add to completed file list only if commit is successfully.
state.deleteTransactionToCommit(txnId);
// Add to completed file list only if commit is successfull else it will be taken care as part of recovery
if(txnId.isPresent()){
Transaction.Status status = writer.getTransactionStatus(txnId.get());
if(status == Transaction.Status.COMMITTED || status == Transaction.Status.ABORTED)
state.deleteTransactionToCommit(txnId);
}

double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0;
double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0;
double megabytesPerSec = megabyteCount / elapsedSec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ParquetFileIngestService extends DeviceDriver{

private final ParquetFileProcessor processor;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> task;
private ScheduledFuture<?> ingestFileTask;
private ScheduledFuture<?> processFileTask;

public ParquetFileIngestService(DeviceDriverConfig config){
Expand Down Expand Up @@ -138,19 +138,19 @@ protected void ingestParquetFiles() {
log.trace("ingestParquetFiles: END");
}
protected void processParquetFiles() {
log.info("processParquetFiles: BEGIN");
log.trace("processParquetFiles: BEGIN");
try {
processor.processParquetFiles();
} catch (Exception e) {
log.error("processParquetFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
log.info("processParquetFiles: END");
log.trace("processParquetFiles: END");
}

@Override
protected void doStart() {
task = executor.scheduleAtFixedRate(
ingestFileTask = executor.scheduleAtFixedRate(
this::ingestParquetFiles,
0,
getIntervalMs(),
Expand All @@ -165,7 +165,8 @@ protected void doStart() {

@Override
protected void doStop() {
task.cancel(false);
log.info("doStop: Cancelling ingestion task and process file task");
ingestFileTask.cancel(false);
processFileTask.cancel(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, S
}
}
}
}catch(Exception ex){
if(ex instanceof IOException){
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}else{
log.error("getDirectoryListing: Exception while listing files: {}", pathSpec.toAbsolutePath());
throw new IOException(ex);
}
}
return directoryListing;
}
Expand Down Expand Up @@ -197,7 +205,7 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
// In case a previous iteration encountered an error, we need to ensure that
// previous flushed transactions are committed and any unflushed transactions as aborted.
transactionCoordinator.performRecovery();
log.info("processFile: Transaction status {} ", writer.getTransactionStatus());
log.debug("processFile: Transaction status {} ", writer.getTransactionStatus());
if(writer.getTransactionStatus() == Transaction.Status.OPEN){
writer.abort();
}
Expand All @@ -220,17 +228,24 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
final Optional<UUID> txnId = writer.flush();
final long nextSequenceNumber = result.getLeft();
final long endOffset = result.getRight();
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
// injectCommitFailure();
try {
// commit fails only if Transaction is not in open state.
log.info("processFile: Commit transaction for Id: {}; file: {}", txnId.orElse(null), fileNameWithBeginOffset.fileName);
writer.commit();
} catch (TxnFailedException ex) {
log.error("processFile: Commit transaction for id: {}, file : {}, failed with exception: {}", txnId, fileNameWithBeginOffset.fileName, ex);
throw new RuntimeException(ex);
}
state.deleteTransactionToCommit(txnId);

// Add to completed file list only if commit is successfull else it will be taken care as part of recovery
if(txnId.isPresent()){
Transaction.Status status = writer.getTransactionStatus(txnId.get());
if(status == Transaction.Status.COMMITTED || status == Transaction.Status.ABORTED)
state.deleteTransactionToCommit(txnId);
}

double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0;
double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0;
double megabytesPerSec = megabyteCount / elapsedSec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class RawFileIngestService extends DeviceDriver{

private final RawFileProcessor processor;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> task;
private ScheduledFuture<?> ingestFileTask;
private ScheduledFuture<?> processFileTask;

public RawFileIngestService(DeviceDriverConfig config){
Expand Down Expand Up @@ -131,19 +131,19 @@ protected void ingestRawFiles() {
log.trace("ingestRawFiles: END");
}
protected void processRawFiles() {
log.info("processRawFiles: BEGIN");
log.trace("processRawFiles: BEGIN");
try {
processor.processRawFiles();
} catch (Exception e) {
log.error("processRawFiles: Process file error", e);
// Continue on any errors. We will retry on the next iteration.
}
log.info("processRawFiles: END");
log.trace("processRawFiles: END");
}

@Override
protected void doStart() {
task = executor.scheduleAtFixedRate(
ingestFileTask = executor.scheduleAtFixedRate(
this::ingestRawFiles,
0,
getIntervalMs(),
Expand All @@ -158,7 +158,8 @@ protected void doStart() {

@Override
protected void doStop() {
task.cancel(false);
log.info("doStop: Cancelling ingestion task and process file task");
ingestFileTask.cancel(false);
processFileTask.cancel(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.*;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -144,10 +140,7 @@ protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
*/
static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, String fileExtension) throws IOException {
final Path pathSpec = Paths.get(fileSpec);
if (!Files.isDirectory(pathSpec.toAbsolutePath())) {
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}

List<FileNameWithOffset> directoryListing = new ArrayList<>();
try(DirectoryStream<Path> dirStream=Files.newDirectoryStream(pathSpec)){
for(Path path: dirStream){
Expand All @@ -160,6 +153,14 @@ static protected List<FileNameWithOffset> getDirectoryListing(String fileSpec, S
}
}
}
}catch(Exception ex){
if(ex instanceof IOException){
log.error("getDirectoryListing: Directory does not exist or spec is not valid : {}", pathSpec.toAbsolutePath());
throw new IOException("Directory does not exist or spec is not valid");
}else{
log.error("getDirectoryListing: Exception while listing files: {}", pathSpec.toAbsolutePath());
throw new IOException(ex);
}
}
return directoryListing;
}
Expand Down Expand Up @@ -194,7 +195,7 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
// In case a previous iteration encountered an error, we need to ensure that
// previous flushed transactions are committed and any unflushed transactions as aborted.
transactionCoordinator.performRecovery();
log.info("processFile: Transaction status {} ", writer.getTransactionStatus());
log.debug("processFile: Transaction status {} ", writer.getTransactionStatus());
if(writer.getTransactionStatus() == Transaction.Status.OPEN){
writer.abort();
}
Expand All @@ -217,17 +218,24 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
final Optional<UUID> txnId = writer.flush();
final long nextSequenceNumber = result.getLeft();
final long endOffset = result.getRight();
log.debug("processFile: Adding completed file: {}", fileNameWithBeginOffset.fileName);
state.addCompletedFile(fileNameWithBeginOffset.fileName, fileNameWithBeginOffset.offset, endOffset, nextSequenceNumber, txnId);
// injectCommitFailure();
try {
// commit fails only if Transaction is not in open state.
log.info("processFile: Commit transaction for Id: {}; file: {}", txnId.orElse(null), fileNameWithBeginOffset.fileName);
writer.commit();
} catch (TxnFailedException ex) {
log.error("processFile: Commit transaction for id: {}, file : {}, failed with exception: {}", txnId, fileNameWithBeginOffset.fileName, ex);
throw new RuntimeException(ex);
}
state.deleteTransactionToCommit(txnId);

// Add to completed file list only if commit is successfull else it will be taken care as part of recovery
if(txnId.isPresent()){
Transaction.Status status = writer.getTransactionStatus(txnId.get());
if(status == Transaction.Status.COMMITTED || status == Transaction.Status.ABORTED)
state.deleteTransactionToCommit(txnId);
}

double elapsedSec = (System.nanoTime() - timestamp) / 1_000_000_000.0;
double megabyteCount = numofbytes.getAndSet(0) / 1_000_000.0;
double megabytesPerSec = megabyteCount / elapsedSec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ static <T> EventWriter<T> create(
/**
* This should called be prior to aborting any transactions to make sure it is not open.
*/
public Transaction.Status getTransactionStatus(UUID txnId);

public Transaction.Status getTransactionStatus();

void close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void commit(UUID txnId) throws TxnFailedException {
public void abort() {
}

@Override
public Transaction.Status getTransactionStatus(UUID txnId) {
throw new UnsupportedOperationException("Non-transactional writer cannot commit transactions");
}

@Override
public Transaction.Status getTransactionStatus() {
throw new UnsupportedOperationException("Non-transactional writer do not have transaction status");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected List<UUID> getTransactionsToCommit() {
public void performRecovery() {
final List<UUID> transactionsToCommit = getTransactionsToCommit();
if (transactionsToCommit.isEmpty()) {
log.debug("Transaction recovery not needed");
log.info("performRecovery: No transactions to be recovered");
} else {
log.warn("Transaction recovery needed on {} transactions", transactionsToCommit.size());
transactionsToCommit.forEach((txnId) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public Transaction.Status getTransactionStatus() {
}
return null;
}
public Transaction.Status getTransactionStatus(UUID txnId) {
return writer.getTxn(txnId).checkStatus();
}


public void close() {
try {
Expand Down

0 comments on commit b8d1a2f

Please sign in to comment.