diff --git a/gradle.properties b/gradle.properties index fa99c486..c1fcf750 100644 --- a/gradle.properties +++ b/gradle.properties @@ -13,11 +13,12 @@ commonsCLIVersion=1.4 commonsCSVVersion=1.8 commonsCodecVersion=1.14 commonsMath3Version=3.6.1 -grizzlyVersion=3.1.3 +grizzlyVersion=2.35 gsonVersion=2.10.1 includePravegaCredentials=true jacksonVersion=2.15.2 -junitVersion=5.6.2 +junitVersion=5.10.1 +junitPlatformVersion=1.10.1 jakartaBindVersion=2.3.2 jaxbVersion=2.3.2 javaxServletApiVersion=3.0.1 diff --git a/parquet-file-sample-data/test_file/hello-world.parquet b/parquet-file-sample-data/test_file/hello-world.parquet new file mode 100644 index 00000000..3245b20d --- /dev/null +++ b/parquet-file-sample-data/test_file/hello-world.parquet @@ -0,0 +1 @@ +Hello World. \ No newline at end of file diff --git a/pravega-sensor-collector/build.gradle b/pravega-sensor-collector/build.gradle index 3e3e354d..4e089677 100644 --- a/pravega-sensor-collector/build.gradle +++ b/pravega-sensor-collector/build.gradle @@ -46,7 +46,10 @@ dependencies { implementation "io.pravega:pravega-client:${pravegaVersion}", "io.pravega:pravega-common:${pravegaVersion}", - "commons-cli:commons-cli:${commonsCLIVersion}" + "commons-cli:commons-cli:${commonsCLIVersion}", + "io.pravega:pravega-standalone:${pravegaVersion}", + "io.pravega:pravega-test-integration:${pravegaVersion}" + if (includePravegaCredentials.toBoolean()) { implementation "io.pravega:pravega-keycloak-client:${pravegaCredentialsVersion}" @@ -68,7 +71,7 @@ dependencies { testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}" testImplementation "org.junit.vintage:junit-vintage-engine:${junitVersion}" testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${junitVersion}" - testImplementation "org.junit.platform:junit-platform-launcher" + testImplementation "org.junit.platform:junit-platform-launcher:${junitPlatformVersion}" testImplementation "org.mockito:mockito-core:${mockitoVersion}" @@ -105,6 +108,7 @@ startScripts { } shadowJar{ + zip64 true archiveBaseName = 'pravega-sensor-collector' archiveClassifier = '' } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverManager.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverManager.java index 552c9b76..47c0579c 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverManager.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverManager.java @@ -40,17 +40,20 @@ protected void doStart() { LOGGER.info("Starting device drivers"); final DeviceDriverFactory factory = new DeviceDriverFactory(); drivers = configs.stream().map(factory::create).collect(Collectors.toList()); - drivers.stream().forEach((driver) -> driver.startAsync()); - drivers.stream().forEach((driver) -> driver.awaitRunning()); + drivers.forEach(AbstractService::startAsync); + drivers.forEach(AbstractService::awaitRunning); LOGGER.info("All device drivers started successfully"); notifyStarted(); } @Override protected void doStop() { - drivers.stream().forEach((driver) -> driver.stopAsync()); - drivers.stream().forEach((driver) -> driver.awaitTerminated()); + LOGGER.info("Stopping all device drivers"); + drivers.forEach(AbstractService::stopAsync); + drivers.forEach(AbstractService::awaitTerminated); drivers = null; + LOGGER.info("Stopped all device drivers"); + notifyStopped(); } /** diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/Parameters.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/Parameters.java index 1800e70d..1bcfb0b2 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/Parameters.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/Parameters.java @@ -26,6 +26,10 @@ public static String getEnvPrefix() { return ENV_PREFIX; } + public static Map getProperties() { + final String fileName = getPropertiesFileName(); + return getProperties(fileName); + } /** * Combines properties from: * 1. properties file (if specified) @@ -33,10 +37,9 @@ public static String getEnvPrefix() { * Values in the system environment will override values in the properties file. * It is intended that properties files only be used when developing in an IDE. */ - public static Map getProperties() { + public static Map getProperties(final String fileName) { Map map = new HashMap<>(); - final String fileName = getPropertiesFileName(); - if (!fileName.isEmpty()) { + if (fileName != null && !fileName.isEmpty()) { log.info("Reading properties from file {}", fileName); Properties properties = new Properties(); try (FileInputStream inputStream = new FileInputStream(fileName)) { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 8dc36673..8a9eceed 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -42,7 +42,7 @@ public abstract class FileIngestService extends DeviceDriver { private static final String EXACTLY_ONCE_KEY = "EXACTLY_ONCE"; private static final String TRANSACTION_TIMEOUT_MINUTES_KEY = "TRANSACTION_TIMEOUT_MINUTES"; private static final String MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY = "MIN_TIME_IN_MILLIS_TO_UPDATE_FILE"; - private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES"; + private static final String DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY = "DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS"; private static final String ENABLE_LARGE_EVENT = "ENABLE_LARGE_EVENT"; private static final int DEFAULT_SAMPLES_PER_EVENT_KEY = 100; @@ -138,8 +138,8 @@ long getMinTimeInMillisToUpdateFile() { return Long.parseLong(getProperty(MIN_TIME_IN_MILLIS_TO_UPDATE_FILE_KEY, "5000")); } - long getDeleteCompletedFilesIntervalInMinutes() { - return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES_KEY, "720")); + long getDeleteCompletedFilesIntervalInSeconds() { + return Long.parseLong(getProperty(DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS_KEY, "43200")); } boolean getLargeEventEnable() { @@ -169,14 +169,14 @@ protected void processFiles() { } protected void deleteCompletedFiles() { - LOG.trace("deleteCompletedFiles: BEGIN"); + LOG.debug("deleteCompletedFiles: BEGIN"); try { processor.deleteCompletedFiles(); } catch (Exception e) { LOG.error("deleteCompletedFiles: Delete file error", e); // Continue on any errors. We will retry on the next iteration. } - LOG.trace("deleteCompletedFiles: END"); + LOG.debug("deleteCompletedFiles: END"); } @Override @@ -201,17 +201,19 @@ protected void doStart() { deleteFileTask = executor.scheduleAtFixedRate( this::deleteCompletedFiles, 1, - getDeleteCompletedFilesIntervalInMinutes(), - TimeUnit.MINUTES); + getDeleteCompletedFilesIntervalInSeconds(), + TimeUnit.SECONDS); notifyStarted(); } @Override protected void doStop() { - LOG.info("doStop: Cancelling ingestion task and process file task"); + LOG.info("doStop: Cancelling ingestion, process and delete file task"); watchFileTask.cancel(false); processFileTask.cancel(false); deleteFileTask.cancel(false); + LOG.info("doStop: Cancelled ingestion, process and delete file task"); + notifyStopped(); } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java index 6c73a173..580fecf7 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java @@ -37,7 +37,7 @@ public class TransactionStateSQLiteImpl implements AutoCloseable, TransactionSt public TransactionStateSQLiteImpl(Connection connection, TransactionCoordinator transactionCoordinator) { this.connection = Preconditions.checkNotNull(connection, "connection"); - this.transactionCoordinator = Preconditions.checkNotNull(transactionCoordinator, "transactionCoordinator"); + this.transactionCoordinator = transactionCoordinator; } @Override @@ -124,7 +124,9 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf deletePendingFileStatement.setString(1, fileName); deletePendingFileStatement.setLong(2, beginOffset); deletePendingFileStatement.execute(); - transactionCoordinator.addTransactionToCommit(txnId); + if(transactionCoordinator!=null) { + transactionCoordinator.addTransactionToCommit(txnId); + } autoRollback.commit(); } } @@ -163,35 +165,38 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf addCompletedFileRecord(fileName, beginOffset, endOffset, newNextSequenceNumber, Optional.empty()); } - /** - * Delete record from TransactionsToCommit table. - * - * @param txnId transaction id - */ - @Override - public void deleteTransactionToCommit(Optional txnId) { - transactionCoordinator.deleteTransactionToCommit(txnId); - } - /** - * Get a list of files from completedFiles table. - * - * @return list of file name and end offset (file size) - */ - @Override - public List getCompletedFileRecords() throws SQLException { - try (final Statement statement = connection.createStatement(); - final ResultSet rs = statement.executeQuery("select fileName, offset from completedFiles")) { - final List files = new ArrayList<>(); - while (rs.next()) { - final FileNameWithOffset fileNameWithOffset = new FileNameWithOffset(rs.getString("fileName"), rs.getLong("offset")); - files.add(fileNameWithOffset); + /** + * Delete record from TransactionsToCommit table + * + * @param txnId transaction id + */ + @Override + public void deleteTransactionToCommit(Optional txnId) { + if(transactionCoordinator!=null) { + transactionCoordinator.deleteTransactionToCommit(txnId); + } + } + + /** + * Get a list of files from completedFiles table + * + * @return list of file name and end offset (file size) + */ + @Override + public List getCompletedFileRecords() throws SQLException { + try (final Statement statement = connection.createStatement(); + final ResultSet rs = statement.executeQuery("select fileName, offset from completedFiles")) { + final List files = new ArrayList<>(); + while (rs.next()) { + final FileNameWithOffset fileNameWithOffset = new FileNameWithOffset(rs.getString("fileName"), rs.getLong("offset")); + files.add(fileNameWithOffset); + } + return files; + } finally { + connection.commit(); } - return files; - } finally { - connection.commit(); } - } /** * Delete completed file record from completedFiles table for given file name. diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java new file mode 100644 index 00000000..8add6ae6 --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaSensorCollectorIntegrationTests.java @@ -0,0 +1,209 @@ +package io.pravega.sensor.collector; + +import com.google.common.util.concurrent.Service; +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import io.pravega.client.admin.ReaderGroupManager; +import io.pravega.client.admin.StreamManager; +import io.pravega.client.stream.*; +import io.pravega.client.stream.impl.UTF8StringSerializer; +import io.pravega.sensor.collector.util.FileNameWithOffset; +import io.pravega.sensor.collector.util.SQliteDBUtility; +import io.pravega.sensor.collector.util.TransactionStateDB; +import io.pravega.sensor.collector.util.TransactionStateSQLiteImpl; +import io.pravega.test.integration.utils.SetupUtils; + +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PravegaSensorCollectorIntegrationTests { + private static final Logger log = LoggerFactory.getLogger(PravegaSensorCollectorIntegrationTests.class); + private final SetupUtils setupUtils = new SetupUtils(); + static String fileName = "./src/test/resources/RawFileIngest-integration-test.properties"; + Map properties = null; + @BeforeEach + public void setup() { + log.info("Setup"); + properties = Parameters.getProperties(fileName); + try { + setupUtils.startAllServices(); + + Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE"))); + } catch (Exception e) { + throw new RuntimeException(e); + } + properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI", setupUtils.getControllerUri().toString()); + log.debug("Properties: {}", properties); + } + + @AfterEach + public void tearDown() { + log.info("TearDown"); + try { + setupUtils.stopAllServices(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + try { + Files.deleteIfExists(Paths.get(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE"))); + } catch (IOException e) { + throw new RuntimeException(e); + } + properties = null; + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testPSCDataIntegration() { + try { + copyHelloWorldFile(); + } catch (IOException e) { + throw new RuntimeException(e); + } + URI controllerURI = setupUtils.getControllerUri(); + String scope = "test-psc-data-integration"; + String streamName ="test-psc-data-integration-stream"; + + properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE",scope); + properties.put("PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM",streamName); + + final DeviceDriverManager deviceDriverManager = new DeviceDriverManager(properties); + Service startService = deviceDriverManager.startAsync(); + try { + startService.awaitRunning(Duration.ofSeconds(30)); + Thread.sleep(12000); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + final Connection connection = SQliteDBUtility.createDatabase(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")); + final TransactionStateDB state = new TransactionStateSQLiteImpl(connection, null); + + try { + List completedFiles = state.getCompletedFileRecords(); + Assertions.assertEquals(1, completedFiles.size()); + + validateStreamData(controllerURI, scope, streamName, new String(Files.readAllBytes(Paths.get("../parquet-file-sample-data/test_file/hello-world.parquet")))); + + Thread.sleep(5000); + + Service stopService = deviceDriverManager.stopAsync(); + stopService.awaitTerminated(Duration.ofSeconds(10)); + + // Till this time all the completed files should get deleted + completedFiles = state.getCompletedFileRecords(); + Assertions.assertEquals(0, completedFiles.size()); + connection.close(); + } catch (SQLException | InterruptedException | TimeoutException | IOException e) { + throw new RuntimeException(e); + } + } + + private static void validateStreamData(URI controllerURI, String scope, String streamName, String content) { + StreamManager streamManager = StreamManager.create(controllerURI); + + final String readerGroup = UUID.randomUUID().toString().replace("-", ""); + final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder() + .stream(Stream.of(scope, streamName)) + .build(); + try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) { + readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig); + } + + try (EventStreamClientFactory clientFactory = EventStreamClientFactory.withScope(scope, + ClientConfig.builder().controllerURI(controllerURI).build()); + EventStreamReader reader = clientFactory.createReader("reader", + readerGroup, + new UTF8StringSerializer(), + ReaderConfig.builder().build())) { + System.out.format("Reading all the events from %s/%s%n", scope, streamName); + EventRead eventRead = null; + try { + while ((eventRead = reader.readNextEvent(2000)).getEvent() != null) { + String event = eventRead.getEvent(); + System.out.format("Read event: %s", event); + Assertions.assertNotNull(event); + Assertions.assertFalse(event.isEmpty()); + Assertions.assertEquals(content, event); + } + } catch (ReinitializationRequiredException e) { + //There are certain circumstances where the reader needs to be reinitialized + e.printStackTrace(); + } + System.out.format("No more events from %s/%s%n", scope, streamName); + } + } + + public void copyHelloWorldFile() throws IOException { + Path sourcePath = Paths.get("../parquet-file-sample-data/test_file/hello-world.parquet"); + Path targetPath = Paths.get("../parquet-file-sample-data/integration-test/hello-world.parquet"); + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testRawFile() { + try { + copyFile(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + final DeviceDriverManager deviceDriverManager = new DeviceDriverManager(properties); + Service startService = deviceDriverManager.startAsync(); + try { + startService.awaitRunning(Duration.ofSeconds(30)); + Thread.sleep(15000); + } catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + final Connection connection = SQliteDBUtility.createDatabase(properties.get("PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE")); + final TransactionStateDB state = new TransactionStateSQLiteImpl(connection, null); + + try { + List completedFiles = state.getCompletedFileRecords(); + Assertions.assertEquals(3, completedFiles.size()); + + Thread.sleep(5000); + + Service stopService = deviceDriverManager.stopAsync(); + stopService.awaitTerminated(Duration.ofSeconds(10)); + + // Till this time all the completed files should get deleted + completedFiles = state.getCompletedFileRecords(); + Assertions.assertEquals(0, completedFiles.size()); + connection.close(); + } catch (SQLException | InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + public void copyFile() throws IOException { + Path sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub1.parquet"); + Path targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub1.parquet"); + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub2.parquet"); + targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub2.parquet"); + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + sourcePath = Paths.get("../parquet-file-sample-data/test_file/sub3.parquet"); + targetPath = Paths.get("../parquet-file-sample-data/integration-test/sub3.parquet"); + Files.copy(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); + log.info("copyFile: Files copied successfully"); + } +} diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImplTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImplTests.java index 07355f94..6412b824 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImplTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImplTests.java @@ -125,13 +125,13 @@ public void processFilesTest() throws SQLException { @Test public void testCreateTransactionStateSQLiteImplWithNullConnection() { Exception exception = Assert.assertThrows(NullPointerException.class, () -> new TransactionStateSQLiteImpl(null, null)); - Assert.assertTrue("connection".equals(exception.getMessage())); + Assertions.assertEquals("connection", exception.getMessage()); } @Test public void testCreateTransactionStateSQLiteImplWithNullTransactionCoordinator() { MockitoAnnotations.initMocks(this); - Exception exception = Assert.assertThrows(NullPointerException.class, () -> new TransactionStateSQLiteImpl(mockConnection, null)); - Assert.assertTrue("transactionCoordinator".equals(exception.getMessage())); + TransactionStateSQLiteImpl state = new TransactionStateSQLiteImpl(mockConnection, null); + Assertions.assertNotNull(state); } } diff --git a/pravega-sensor-collector/src/test/resources/LogFileIngestTest.properties b/pravega-sensor-collector/src/test/resources/LogFileIngestTest.properties index 45fcfdb3..d92cb111 100644 --- a/pravega-sensor-collector/src/test/resources/LogFileIngestTest.properties +++ b/pravega-sensor-collector/src/test/resources/LogFileIngestTest.properties @@ -22,4 +22,4 @@ PRAVEGA_SENSOR_COLLECTOR_ACCEL2_CREATE_SCOPE=true PRAVEGA_SENSOR_COLLECTOR_ACCEL2_STREAM=sensors-accelerometer PRAVEGA_SENSOR_COLLECTOR_ACCEL2_ROUTING_KEY=routingkey1 PRAVEGA_SENSOR_COLLECTOR_ACCEL2_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 -PRAVEGA_SENSOR_COLLECTOR_ACCEL2_DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES=720 \ No newline at end of file +PRAVEGA_SENSOR_COLLECTOR_ACCEL2_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=43200 \ No newline at end of file diff --git a/pravega-sensor-collector/src/test/resources/ParquetFileIngest.properties b/pravega-sensor-collector/src/test/resources/ParquetFileIngest.properties index e06b09de..86336b52 100644 --- a/pravega-sensor-collector/src/test/resources/ParquetFileIngest.properties +++ b/pravega-sensor-collector/src/test/resources/ParquetFileIngest.properties @@ -21,7 +21,7 @@ PRAVEGA_SENSOR_COLLECTOR_PARQ2_STREAM=stream-p PRAVEGA_SENSOR_COLLECTOR_PARQ2_ROUTING_KEY=$(hostname) PRAVEGA_SENSOR_COLLECTOR_PARQ2_TRANSACTION_TIMEOUT_MINUTES=2.0 PRAVEGA_SENSOR_COLLECTOR_PARQ2_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 -PRAVEGA_SENSOR_COLLECTOR_PARQ2_DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES=720 +PRAVEGA_SENSOR_COLLECTOR_PARQ2_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=43200 HADOOP_HOME=${HOME}/dev # windows - location of bin/winutils.exe diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties new file mode 100644 index 00000000..7782b040 --- /dev/null +++ b/pravega-sensor-collector/src/test/resources/RawFileIngest-integration-test.properties @@ -0,0 +1,24 @@ +# +# Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# This file can be used to manually test RawFileIngestService. +PRAVEGA_SENSOR_COLLECTOR_RAW1_CLASS=io.pravega.sensor.collector.file.rawfile.RawFileIngestService +PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_SPEC=../parquet-file-sample-data/integration-test +PRAVEGA_SENSOR_COLLECTOR_RAW1_FILE_EXTENSION=parquet +PRAVEGA_SENSOR_COLLECTOR_RAW1_DATABASE_FILE=./integration-test.db +#PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI=tls://pravega-controller.texas-twister.ns.sdp.hop.lab.emc.com:443 +PRAVEGA_SENSOR_COLLECTOR_RAW1_PRAVEGA_CONTROLLER_URI=tcp://127.0.0.1:9090 +PRAVEGA_SENSOR_COLLECTOR_RAW1_SCOPE=test-psc +PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM=test-psc-stream +PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY=$(hostname) +PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES=false +PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0 +PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE=true +PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 +PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=15 diff --git a/pravega-sensor-collector/src/test/resources/RawFileIngest.properties b/pravega-sensor-collector/src/test/resources/RawFileIngest.properties index 5a1e77ac..c9da28bc 100644 --- a/pravega-sensor-collector/src/test/resources/RawFileIngest.properties +++ b/pravega-sensor-collector/src/test/resources/RawFileIngest.properties @@ -20,4 +20,4 @@ PRAVEGA_SENSOR_COLLECTOR_RAW1_STREAM=stream1 PRAVEGA_SENSOR_COLLECTOR_RAW1_ROUTING_KEY=$(hostname) PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES=2.0 PRAVEGA_SENSOR_COLLECTOR_RAW1_MIN_TIME_IN_MILLIS_TO_UPDATE_FILE=5000 -PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_MINUTES=720 +PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES_INTERVAL_IN_SECONDS=43200 diff --git a/windows-service/PravegaSensorCollectorApp.xml b/windows-service/PravegaSensorCollectorApp.xml index 2032d30b..ab1c22b7 100644 --- a/windows-service/PravegaSensorCollectorApp.xml +++ b/windows-service/PravegaSensorCollectorApp.xml @@ -42,9 +42,9 @@ - + - +