Skip to content

Commit

Permalink
Merge branch 'stability-improvements' into fix-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorva918 authored Nov 21, 2023
2 parents 81464b6 + 3ccd4ef commit 18b28e6
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

/**
*
*The EventGenerator is responsible for generating events depending on file type
*/
public interface EventGenerator{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public FileConfig(String stateDatabaseFileName, String fileSpec, String fileExte

@Override
public String toString() {
return "LogFileSequenceConfig{" +
return "FileConfig{" +
"stateDatabaseFileName='" + stateDatabaseFileName + '\'' +
", fileSpec='" + fileSpec + '\'' +
", fileExtension='" + fileExtension + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ public abstract class FileProcessor {
private static final Logger log = LoggerFactory.getLogger(FileProcessor.class);

private final FileConfig config;
private final TransactionStateSQLiteImpl state;
private final TransactionStateDB state;
private final EventWriter<byte[]> writer;
private final TransactionCoordinator transactionCoordinator;
private final EventGenerator eventGenerator;

public FileProcessor(FileConfig config, TransactionStateSQLiteImpl state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {

public FileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator) {
this.config = config;
this.state = state;
this.writer = writer;
Expand Down Expand Up @@ -73,7 +74,8 @@ public static FileProcessor create(
final TransactionCoordinator transactionCoordinator = new TransactionCoordinator(connection, writer);
transactionCoordinator.performRecovery();

final TransactionStateSQLiteImpl state = new TransactionStateSQLiteImpl(connection, transactionCoordinator);

final TransactionStateDB state = new TransactionStateSQLiteImpl(connection, transactionCoordinator);
return FileProcessorFactory.createFileSequenceProcessor(config, state, writer, transactionCoordinator,writerId);

}
Expand All @@ -88,13 +90,13 @@ public void watchFiles() throws Exception {
findAndRecordNewFiles();
}
public void processFiles() throws Exception {
log.info("processFiles: BEGIN");
log.debug("processFiles: BEGIN");
if (config.enableDeleteCompletedFiles) {
log.debug("processFiles: Deleting completed files");
deleteCompletedFiles();
}
processNewFiles();
log.info("processFiles: END");
log.debug("processFiles: END");
}

public void processNewFiles() throws Exception {
Expand Down Expand Up @@ -122,7 +124,7 @@ protected void findAndRecordNewFiles() throws Exception {
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.debug("getDirectoryListing: fileSpec={}", config.fileSpec);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension, state, config.stateDatabaseFileName);
final List<FileNameWithOffset> directoryListing = FileUtils.getDirectoryListing(config.fileSpec, config.fileExtension);
log.debug("getDirectoryListing: directoryListing={}", directoryListing);
return directoryListing;
}
Expand Down Expand Up @@ -169,14 +171,13 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
e -> {
log.trace("processFile: event={}", e);
try {
writer.writeEvent(e.routingKey, e.bytes);
writer.writeEvent(e.routingKey, e.bytes);
numofbytes.addAndGet(e.bytes.length);
} catch (TxnFailedException ex) {
log.error("processFile: Write event to transaction failed with exception {} while processing file: {}, event: {}", ex, fileNameWithBeginOffset.fileName, e);
/*
TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again?
This will occur only if Transaction state is not open
*/

/* TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again?
This will occur only if Transaction state is not open*/
throw new RuntimeException(ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import io.pravega.sensor.collector.file.rawfile.RawFileProcessor;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import io.pravega.sensor.collector.util.TransactionStateDB;
import io.pravega.sensor.collector.util.TransactionStateSQLiteImpl;

/*
* The FileProcessorFactory class is responsible for creating instances of file processors based on the type of the input file.
*
*/
public class FileProcessorFactory {

public static FileProcessor createFileSequenceProcessor(final FileConfig config, TransactionStateSQLiteImpl state,
public static FileProcessor createFileSequenceProcessor(final FileConfig config, TransactionStateDB state,
EventWriter<byte[]> writer,
TransactionCoordinator transactionCoordinator,
String writerId){
Expand All @@ -30,18 +31,7 @@ public static FileProcessor createFileSequenceProcessor(final FileConfig config,
return new RawFileProcessor(config, state, writer, transactionCoordinator, writerId);

default :
throw new RuntimeException("Unsupported className "+ "test");
throw new RuntimeException("Unsupported className: "+ className);
}
/*try{
// Class<?> clazz = Class.forName(config.fileType);
//className = clazz.getSimpleName();
// System.out.println(" className is "+className);
}catch (ClassNotFoundException ex){
throw new RuntimeException(ex);
}*/



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,20 @@
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.file.FileConfig;
import io.pravega.sensor.collector.file.FileProcessor;
import io.pravega.sensor.collector.file.parquet.ParquetEventGenerator;
import io.pravega.sensor.collector.util.*;
import io.pravega.sensor.collector.util.TransactionStateDB;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CsvFileSequenceProcessor extends FileProcessor {
private static final Logger log = LoggerFactory.getLogger(CsvFileSequenceProcessor.class);
//private final EventGenerator eventGenerator;
private final FileConfig config;
private final String writerId;


public CsvFileSequenceProcessor(FileConfig config, TransactionStateSQLiteImpl state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {

public CsvFileSequenceProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {
super(config, state, writer, transactionCoordinator);
this.config =config;
this.writerId = writerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.file.FileConfig;
import io.pravega.sensor.collector.file.FileProcessor;
import io.pravega.sensor.collector.util.*;
import io.pravega.sensor.collector.util.TransactionStateDB;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,7 +25,8 @@ public class ParquetFileProcessor extends FileProcessor {
private final FileConfig config;
private final String writerId;

public ParquetFileProcessor(FileConfig config, TransactionStateSQLiteImpl state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {

public ParquetFileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {
super(config,state,writer,transactionCoordinator);
this.config =config;
this.writerId = writerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import io.pravega.sensor.collector.file.EventGenerator;
import io.pravega.sensor.collector.file.FileConfig;
import io.pravega.sensor.collector.file.FileProcessor;
import io.pravega.sensor.collector.file.parquet.ParquetEventGenerator;
import io.pravega.sensor.collector.util.*;
import io.pravega.sensor.collector.util.TransactionStateDB;
import io.pravega.sensor.collector.util.EventWriter;
import io.pravega.sensor.collector.util.TransactionCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,7 +25,8 @@ public class RawFileProcessor extends FileProcessor {
private final FileConfig config;
private final String writerId;

public RawFileProcessor(FileConfig config, TransactionStateSQLiteImpl state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {

public RawFileProcessor(FileConfig config, TransactionStateDB state, EventWriter<byte[]> writer, TransactionCoordinator transactionCoordinator, String writerId) {
super(config, state, writer, transactionCoordinator);
this.config =config;
this.writerId = writerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.file.csvfile;

import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.util.PravegaWriterEvent;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

public class CSVFileEventGeneratorTests {
private static final Logger log = LoggerFactory.getLogger(CSVFileEventGeneratorTests.class);

@Test
public void Test3by2() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 2);
final String csvStr =
"\"Time\",\"X\",\"Y\",\"Z\",\"IN_PROGRESS\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.362\",\"1.305966\",\"0.1\",\"1.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.415\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(102L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}

@Test
public void Test3by3() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 3);
final String csvStr =
"\"Time\",\"X\",\"Y\",\"Z\",\"IN_PROGRESS\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.362\",\"1.305966\",\"0.1\",\"1.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.415\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(101L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}

@Test
public void Test1by3() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 3);
final String csvStr =
"\"Time\",\"X\",\"Y\",\"Z\",\"IN_PROGRESS\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(101L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}

@Test
public void Test0by3() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 3);
final String csvStr =
"\"Time\",\"X\",\"Y\",\"Z\",\"IN_PROGRESS\"\n";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(100L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}

@Test
public void TestEmptyFile() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 3);
final String csvStr = "";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(100L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}

@Test
public void test7by3() throws IOException {
final EventGenerator eventGenerator = CsvFileEventGenerator.create("routingKey1", 3);
final String csvStr =
"\"Time\",\"X\",\"Y\",\"Z\",\"IN_PROGRESS\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.362\",\"1.305966\",\"0.1\",\"1.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.362\",\"1.305966\",\"0.1\",\"1.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.352\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.362\",\"1.305966\",\"0.1\",\"1.331963\",\"0\"\n" +
"\"2020-07-15 23:59:50.415\",\"0.305966\",\"0.0\",\"9.331963\",\"0\"\n";
final CountingInputStream inputStream = new CountingInputStream(new ByteArrayInputStream(csvStr.getBytes(StandardCharsets.UTF_8)));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 100, events::add);
log.info("events={}", events);
Assertions.assertEquals(103L, (long) nextSequenceNumberAndOffset.getLeft());
Assertions.assertEquals(csvStr.length(), (long) nextSequenceNumberAndOffset.getRight());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.pravega.sensor.collector.file.parquet;

import com.google.common.io.CountingInputStream;
import io.pravega.sensor.collector.file.EventGenerator;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.pravega.sensor.collector.util.FileNameWithOffset;
import io.pravega.sensor.collector.util.FileUtils;
import io.pravega.sensor.collector.util.PravegaWriterEvent;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetEventGeneratorTests {
private static final Logger log = LoggerFactory.getLogger(ParquetEventGeneratorTests.class);

@Test
public void TestFile() throws IOException {
final EventGenerator eventGenerator = ParquetEventGenerator.create("routingKey1",100);
final List<FileNameWithOffset> files = FileUtils.getDirectoryListing("../parquet-file-sample-data","parquet");
File parquetData= new File(files.get(0).fileName);

final CountingInputStream inputStream = new CountingInputStream(new FileInputStream(parquetData));
final List<PravegaWriterEvent> events = new ArrayList<>();
Pair<Long, Long> nextSequenceNumberAndOffset = eventGenerator.generateEventsFromInputStream(inputStream, 1, events::add);
log.info("events={}", events);
Assert.assertEquals(501L, (long) nextSequenceNumberAndOffset.getLeft());
Assert.assertEquals(parquetData.length(), (long) nextSequenceNumberAndOffset.getRight());
}

}
Loading

0 comments on commit 18b28e6

Please sign in to comment.