Skip to content

Commit

Permalink
Merge branch 'stability-improvements-v2' into update-for-ca-certs
Browse files Browse the repository at this point in the history
  • Loading branch information
apoorva918 committed Feb 14, 2024
2 parents 4a5e638 + 028f47f commit e30e115
Show file tree
Hide file tree
Showing 14 changed files with 307 additions and 55 deletions.
5 changes: 3 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ commonsCLIVersion=1.4
commonsCSVVersion=1.8
commonsCodecVersion=1.14
commonsMath3Version=3.6.1
grizzlyVersion=3.1.3
grizzlyVersion=2.35
gsonVersion=2.10.1
includePravegaCredentials=true
jacksonVersion=2.15.2
junitVersion=5.6.2
junitVersion=5.10.1
junitPlatformVersion=1.10.1
jakartaBindVersion=2.3.2
jaxbVersion=2.3.2
javaxServletApiVersion=3.0.1
Expand Down
1 change: 1 addition & 0 deletions parquet-file-sample-data/test_file/hello-world.parquet
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Hello World.
8 changes: 6 additions & 2 deletions pravega-sensor-collector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ dependencies {

implementation "io.pravega:pravega-client:${pravegaVersion}",
"io.pravega:pravega-common:${pravegaVersion}",
"commons-cli:commons-cli:${commonsCLIVersion}"
"commons-cli:commons-cli:${commonsCLIVersion}",
"io.pravega:pravega-standalone:${pravegaVersion}",
"io.pravega:pravega-test-integration:${pravegaVersion}"


if (includePravegaCredentials.toBoolean()) {
implementation "io.pravega:pravega-keycloak-client:${pravegaCredentialsVersion}"
Expand All @@ -68,7 +71,7 @@ dependencies {
testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
testImplementation "org.junit.vintage:junit-vintage-engine:${junitVersion}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}"
testImplementation "org.junit.platform:junit-platform-launcher"
testImplementation "org.junit.platform:junit-platform-launcher:${junitPlatformVersion}"

testImplementation "org.mockito:mockito-core:${mockitoVersion}"

Expand Down Expand Up @@ -105,6 +108,7 @@ startScripts {
}

shadowJar{
zip64 true
archiveBaseName = 'pravega-sensor-collector'
archiveClassifier = ''
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ protected void doStart() {
LOGGER.info("Starting device drivers");
final DeviceDriverFactory factory = new DeviceDriverFactory();
drivers = configs.stream().map(factory::create).collect(Collectors.toList());
drivers.stream().forEach((driver) -> driver.startAsync());
drivers.stream().forEach((driver) -> driver.awaitRunning());
drivers.forEach(AbstractService::startAsync);
drivers.forEach(AbstractService::awaitRunning);
LOGGER.info("All device drivers started successfully");
notifyStarted();
}

@Override
protected void doStop() {
drivers.stream().forEach((driver) -> driver.stopAsync());
drivers.stream().forEach((driver) -> driver.awaitTerminated());
LOGGER.info("Stopping all device drivers");
drivers.forEach(AbstractService::stopAsync);
drivers.forEach(AbstractService::awaitTerminated);
drivers = null;
LOGGER.info("Stopped all device drivers");
notifyStopped();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ public static String getEnvPrefix() {
return ENV_PREFIX;
}

public static Map<String, String> getProperties() {
final String fileName = getPropertiesFileName();
return getProperties(fileName);
}
/**
* Combines properties from:
* 1. properties file (if specified)
* 2. system environment
* Values in the system environment will override values in the properties file.
* It is intended that properties files only be used when developing in an IDE.
*/
public static Map<String, String> getProperties() {
public static Map<String, String> getProperties(final String fileName) {
Map<String, String> map = new HashMap<>();
final String fileName = getPropertiesFileName();
if (!fileName.isEmpty()) {
if (fileName != null && !fileName.isEmpty()) {
log.info("Reading properties from file {}", fileName);
Properties properties = new Properties();
try (FileInputStream inputStream = new FileInputStream(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class FileIngestService extends DeviceDriver {
private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE";
private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES";
private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE";
private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES";
private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS";
private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT";

private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100;
Expand Down Expand Up @@ -138,8 +138,8 @@ long getMinTimeInMillisToUpdateFile() {
return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000"));
}

long getDeleteCompletedFilesIntervalInMinutes() {
return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY, "720"));
long getDeleteCompletedFilesIntervalInSeconds() {
return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY, "43200"));
}

boolean getLargeEventEnable() {
Expand Down Expand Up @@ -169,14 +169,14 @@ protected void processFiles() {
}

protected void deleteCompletedFiles() {
LOG.trace("deleteCompletedFiles: BEGIN");
LOG.debug("deleteCompletedFiles: BEGIN");
try {
processor.deleteCompletedFiles();
} catch (Exception e) {
LOG.error("deleteCompletedFiles: Delete file error", e);
// Continue on any errors. We will retry on the next iteration.
}
LOG.trace("deleteCompletedFiles: END");
LOG.debug("deleteCompletedFiles: END");
}

@Override
Expand All @@ -201,17 +201,19 @@ protected void doStart() {
deleteFileTask = executor.scheduleAtFixedRate(
this::deleteCompletedFiles,
1,
getDeleteCompletedFilesIntervalInMinutes(),
TimeUnit.MINUTES);
getDeleteCompletedFilesIntervalInSeconds(),
TimeUnit.SECONDS);

notifyStarted();
}

@Override
protected void doStop() {
LOG.info("doStop: Cancelling ingestion task and process file task");
LOG.info("doStop: Cancelling ingestion, process and delete file task");
watchFileTask.cancel(false);
processFileTask.cancel(false);
deleteFileTask.cancel(false);
LOG.info("doStop: Cancelled ingestion, process and delete file task");
notifyStopped();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TransactionStateSQLiteImpl implements AutoCloseable, TransactionSt

public TransactionStateSQLiteImpl(Connection connection, TransactionCoordinator transactionCoordinator) {
this.connection = Preconditions.checkNotNull(connection, "connection");
this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator");
this.transactionCoordinator = transactionCoordinator;
}

@Override
Expand Down Expand Up @@ -124,7 +124,9 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf
deletePendingFileStatement.setString(1, fileName);
deletePendingFileStatement.setLong(2, beginOffset);
deletePendingFileStatement.execute();
transactionCoordinator.addTransactionToCommit(txnId);
if(transactionCoordinator!=null) {
transactionCoordinator.addTransactionToCommit(txnId);
}
autoRollback.commit();
}
}
Expand Down Expand Up @@ -163,35 +165,38 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf
addCompletedFileRecord(fileName, beginOffset, endOffset, newNextSequenceNumber, Optional.empty());
}

/**
* Delete record from TransactionsToCommit table.
*
* @param txnId transaction id
*/
@Override
public void deleteTransactionToCommit(Optional<UUID> txnId) {
transactionCoordinator.deleteTransactionToCommit(txnId);
}

/**
* Get a list of files from completedFiles table.
*
* @return list of file name and end offset (file size)
*/
@Override
public List<FileNameWithOffset> getCompletedFileRecords() throws SQLException {
try (final Statement statement = connection.createStatement();
final ResultSet rs = statement.executeQuery("select fileName, offset from completedFiles")) {
final List<FileNameWithOffset> files = new ArrayList<>();
while (rs.next()) {
final FileNameWithOffset fileNameWithOffset = new FileNameWithOffset(rs.getString("fileName"), rs.getLong("offset"));
files.add(fileNameWithOffset);
/**
* Delete record from TransactionsToCommit table
*
* @param txnId transaction id
*/
@Override
public void deleteTransactionToCommit(Optional<UUID> txnId) {
if(transactionCoordinator!=null) {
transactionCoordinator.deleteTransactionToCommit(txnId);
}
}

/**
* Get a list of files from completedFiles table
*
* @return list of file name and end offset (file size)
*/
@Override
public List<FileNameWithOffset> getCompletedFileRecords() throws SQLException {
try (final Statement statement = connection.createStatement();
final ResultSet rs = statement.executeQuery("select fileName, offset from completedFiles")) {
final List<FileNameWithOffset> files = new ArrayList<>();
while (rs.next()) {
final FileNameWithOffset fileNameWithOffset = new FileNameWithOffset(rs.getString("fileName"), rs.getLong("offset"));
files.add(fileNameWithOffset);
}
return files;
} finally {
connection.commit();
}
return files;
} finally {
connection.commit();
}
}

/**
* Delete completed file record from completedFiles table for given file name.
Expand Down
Loading

0 comments on commit e30e115

Please sign in to comment.