diff --git a/README.md b/README.md index f757caf5..e1e6aba7 100644 --- a/README.md +++ b/README.md @@ -246,9 +246,7 @@ If the TLS Certificate Authority (CA) used by Pravega is not trusted by a well-k ``` OR ``` - kubectl get secret pravega-controller-tls -n nautilus-pravega -o jsonpath="{.data.tls\.crt}" | base64 --decode > ~/pravega.crt - kubectl get secret keycloak-tls -n nautilus-system -o jsonpath="{.data.tls\.crt}" | base64 --decode > ~/keycloak.crt - kubectl get secret pravega-tls -n nautilus-pravega -o jsonpath="{.data.tls\.crt}" | base64 --decode > ~/pravegaAll.crt + kubectl get cabundle sdp-default -n nautilus-system -o jsonpath="{.spec.certificates.sdp}" > ~/tlsCA-sdp.crt ``` 2. On the target system, add the CA certificate to the operating system. ```shell diff --git a/config/checkstyle.xml b/config/checkstyle.xml index b910873f..3c417ef9 100644 --- a/config/checkstyle.xml +++ b/config/checkstyle.xml @@ -9,7 +9,7 @@ - + diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverFactory.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverFactory.java index b73ee37e..8af314c1 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverFactory.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/DeviceDriverFactory.java @@ -18,6 +18,7 @@ public class DeviceDriverFactory { /** * Instantiate a concrete subclass of DeviceDriver based on key/value properties. + * @param config */ DeviceDriver create(DeviceDriverConfig config) { try { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/EventGenerator.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/EventGenerator.java index c33a1153..dcdd605e 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/EventGenerator.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/EventGenerator.java @@ -17,17 +17,19 @@ import java.util.function.Consumer; /** - *The EventGenerator is responsible for generating events depending on file type + *The EventGenerator is responsible for generating events depending on file type. */ public interface EventGenerator { - /* - * Generate events from Input stream. - * Depending on file type event generation logic defers - * @param inputStream - * @param firstSequenceNumber - * @return next sequence number, end offset - * */ + /** + * Generate events from Input stream. + * Depending on file type event generation logic defers + * @param inputStream + * @param firstSequenceNumber + * @param consumer + * @return next sequence number, end offset + * @throws IOException + */ Pair generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer consumer) throws IOException; } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java index 03f4827c..8a9eceed 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileIngestService.java @@ -86,6 +86,7 @@ public FileIngestService(DeviceDriverConfig config) { String getFileSpec() { return getProperty(FILE_SPEC_KEY); } + String getFileExtension() { return getProperty(FILE_EXT, ""); } @@ -155,6 +156,7 @@ protected void watchFiles() { } LOG.trace("watchFiles: END"); } + protected void processFiles() { LOG.trace("processFiles: BEGIN"); try { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java index e20c58cf..1a3a518a 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/FileProcessor.java @@ -103,9 +103,11 @@ public static FileProcessor create( * @return eventGenerator */ public abstract EventGenerator getEventGenerator(FileConfig config); + public void watchFiles() throws Exception { findAndRecordNewFiles(); } + public void processFiles() throws Exception { log.debug("processFiles: BEGIN"); if (config.enableDeleteCompletedFiles) { @@ -137,7 +139,9 @@ protected void findAndRecordNewFiles() throws Exception { } /** + * Get directory listing. * @return list of file name and file size in bytes + * @throws IOException If unable to list files */ protected List getDirectoryListing() throws IOException { log.debug("getDirectoryListing: fileSpec={}", config.fileSpec); @@ -149,6 +153,9 @@ protected List getDirectoryListing() throws IOException { } /** + * Get list of new files. + * @param directoryListing list of directories + * @param completedFiles list of completed files * @return sorted list of file name and file size in bytes */ protected List getNewFiles(List directoryListing, List completedFiles) { @@ -165,7 +172,7 @@ protected List getNewFiles(List director FileUtils.moveCompletedFile(dirFile, movedFilesDirectory); log.warn("File: {} already marked as completed, moving now", dirFile.fileName); } catch (IOException e) { - log.error("File: {} already marked as completed, but failed to move, error:{}", dirFile.fileName,e.getMessage()); + log.error("File: {} already marked as completed, but failed to move, error:{}", dirFile.fileName, e.getMessage()); } } }); @@ -213,8 +220,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN } catch (TxnFailedException ex) { log.error("processFile: Write event to transaction failed with exception {} while processing file: {}, event: {}", ex, fileNameWithBeginOffset.fileName, e); - /* TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again? - This will occur only if Transaction state is not open*/ + /* TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again? + This will occur only if Transaction state is not open */ throw new RuntimeException(ex); } @@ -265,18 +272,14 @@ void deleteCompletedFiles() throws Exception { Path filePath = completedFilesPath.resolve(completedFileName); log.debug("deleteCompletedFiles: Deleting File default name:{}, and it's completed file name:{}.", file.fileName, filePath); try { - /** - * If file gets deleted from completed files directory, or it does not exist in default ingestion directory - * then only remove the record from DB. - */ + /* If file gets deleted from completed files directory, or it does not exist in default ingestion directory + * then only remove the record from DB. */ if (Files.deleteIfExists(filePath) || Files.notExists(Paths.get(file.fileName))) { state.deleteCompletedFileRecord(file.fileName); log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath); } else { - /** - * This situation occurs because at first attempt moving file to completed directory fails, but the file still exists in default ingestion directory. - * Moving file from default directory to completed directory will be taken care in next iteration, post which delete will be taken care. - */ + /* This situation occurs because at first attempt moving file to completed directory fails, but the file still exists in default ingestion directory. + * Moving file from default directory to completed directory will be taken care in next iteration, post which delete will be taken care. */ log.warn("deleteCompletedFiles: File {} doesn't exists in completed directory but still exist in default ingestion directory.", filePath); } } catch (Exception e) { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileEventGenerator.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileEventGenerator.java index 07e642cc..d8e02646 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileEventGenerator.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileEventGenerator.java @@ -31,7 +31,7 @@ import java.util.function.Consumer; /** - * Generate Event from CSV file + * Generate Event from CSV file. */ public class CsvFileEventGenerator implements EventGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(CsvFileEventGenerator.class); @@ -66,6 +66,8 @@ public static CsvFileEventGenerator create(String routingKey, int maxRecordsPerE /** Generate event from input stream. number of records in one event is defined in input config file * @param inputStream * @param firstSequenceNumber + * @param consumer + * @throws IOException If there is any error generating event. * @return next sequence number, end offset */ public Pair generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer consumer) throws IOException { @@ -73,10 +75,10 @@ public Pair generateEventsFromInputStream(CountingInputStream inputS final CSVParser parser = CSVParser.parse(inputStream, StandardCharsets.UTF_8, format); long nextSequenceNumber = firstSequenceNumber; int numRecordsInEvent = 0; - List> eventBatch = new ArrayList<>(); + List> eventBatch = new ArrayList<>(); for (CSVRecord record : parser) { - HashMap recordDataMap = new HashMap(); - for (int i=0; i recordDataMap = new HashMap(); + for (int i = 0; i < record.size(); i++) { recordDataMap.put(parser.getHeaderNames().get(i), convertValue(record.get(i))); } eventBatch.add(recordDataMap); @@ -101,10 +103,10 @@ public Object convertValue(String s) { // TODO: convert timestamp try { return Long.parseLong(s); - } catch (NumberFormatException ignored) {} + } catch (NumberFormatException ignored) { } try { return Double.parseDouble(s); - } catch (NumberFormatException ignored) {} + } catch (NumberFormatException ignored) { } return s; } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessor.java index 44b01c79..a14d549a 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessor.java @@ -31,7 +31,7 @@ public CsvFileSequenceProcessor(FileConfig config, TransactionStateDB state, Eve } /** - * Event generator for CSV file + * Event generator for CSV file. * @param config configurations parameters * @return eventGenerator */ diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/parquet/ParquetEventGenerator.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/parquet/ParquetEventGenerator.java index 8617e470..3af3ecd4 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/parquet/ParquetEventGenerator.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/parquet/ParquetEventGenerator.java @@ -1,4 +1,3 @@ - /** * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. * @@ -48,7 +47,7 @@ import java.util.stream.Collectors; /** - * Generate Event from Parquet file + * Generate Event from Parquet file. */ public class ParquetEventGenerator implements EventGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(ParquetEventGenerator.class); @@ -88,6 +87,8 @@ public static ParquetEventGenerator create(String routingKey, int maxRecordsPerE * * @param inputStream * @param firstSequenceNumber + * @param consumer + * @throws IOException * @return next sequence number, end offset */ public Pair generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer consumer) throws IOException { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawEventGenerator.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawEventGenerator.java index a97b3cc7..2b236bba 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawEventGenerator.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawEventGenerator.java @@ -1,4 +1,3 @@ - /** * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. * @@ -28,7 +27,7 @@ import java.util.function.Consumer; /** - * Generate Event from RAW file + * Generate Event from RAW file. */ public class RawEventGenerator implements EventGenerator { private static final Logger LOGGER = LoggerFactory.getLogger(RawEventGenerator.class); @@ -60,9 +59,11 @@ public static RawEventGenerator create(String routingKey) throws IOException { /** - * Convert File to byteArray + * Convert File to byteArray. * @param inputStream * @param firstSequenceNumber + * @param consumer + * @throws IOException * @return next sequence number, end offset */ public Pair generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer consumer) throws IOException { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessor.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessor.java index cab945ac..0a871301 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessor.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessor.java @@ -29,7 +29,7 @@ public RawFileProcessor(FileConfig config, TransactionStateDB state, EventWriter this.writerId = writerId; } - /** Event generator for Raw file + /** Event generator for Raw file. * @param config configurations parameters * @return eventGenerator */ diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/AuthTokenDto.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/AuthTokenDto.java index 506bece1..e3cc9465 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/AuthTokenDto.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/AuthTokenDto.java @@ -18,22 +18,22 @@ public class AuthTokenDto { public String displayName; public String token; public List roles; - public Integer expires_in; + public Integer expiresIn; public AuthTokenDto() { } - public AuthTokenDto(String userId, String displayName, String token, List roles, Integer expires_in) { + public AuthTokenDto(String userId, String displayName, String token, List roles, Integer expiresIn) { this.userId = userId; this.displayName = displayName; this.token = token; this.roles = roles; - this.expires_in = expires_in; + this.expiresIn = expiresIn; } @Override public String toString() { return "AuthTokenDto{" + "userId=" + userId + ", displayName=" + displayName + ", token=" + token + ", roles=" - + roles + ", expires_in=" + expires_in + '}'; + + roles + ", expires_in=" + expiresIn + '}'; } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/LeapDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/LeapDriver.java index 0211f07b..54270ff8 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/LeapDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/leap/LeapDriver.java @@ -43,10 +43,10 @@ import io.pravega.sensor.collector.stateful.StatefulSensorDeviceDriver; public class LeapDriver extends StatefulSensorDeviceDriver { - private static final Logger log = LoggerFactory.getLogger(LeapDriver.class); - private static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + private static final Logger LOGGER = LoggerFactory.getLogger(LeapDriver.class); + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); static { - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); } private static final String POLL_PERIOD_SEC_KEY = "POLL_PERIOD_SEC"; @@ -69,16 +69,16 @@ public LeapDriver(DeviceDriverConfig config) { apiUri = getProperty(API_URI_KEY); userName = getProperty(USERNAME_KEY); password = getProperty(PASSWORD_KEY); - log.info("Poll Period Sec: {}", pollPeriodSec); - log.info("API URI: {}", apiUri); - log.info("User Name: {}", userName); - log.info("Password: {}", password); + LOGGER.info("Poll Period Sec: {}", pollPeriodSec); + LOGGER.info("API URI: {}", apiUri); + LOGGER.info("User Name: {}", userName); + LOGGER.info("Password: {}", password); final long bucketCapacity = 2; final long periodMillis = (long) (bucketCapacity * 1000 * pollPeriodSec); bucket = Bucket4j.builder().withMillisecondPrecision() .addLimit(Bandwidth.simple(bucketCapacity, Duration.ofMillis(periodMillis))).build(); - log.info("Token Bucket: {}", bucket); + LOGGER.info("Token Bucket: {}", bucket); bucket.tryConsume(bucketCapacity - 1); } @@ -89,12 +89,13 @@ public String initialState() { } /** + * For poll events. * @param state A string containing a timestamp which is the maximum timestamp of any reading * ever returned by the Leap server. */ @Override public PollResponse pollEvents(String state) throws Exception { - log.debug("pollEvents: BEGIN"); + LOGGER.debug("pollEvents: BEGIN"); bucket.asScheduler().consume(1); final List events = new ArrayList<>(); final HttpClient client = HttpClient.newHttpClient(); @@ -103,19 +104,19 @@ public PollResponse pollEvents(String state) throws Exception { uri = apiUri + "/ClientApi/V1/DeviceReadings"; } else { // Add 1 ms to get new readings from just after the last read state. - final String startDate = dateFormat.format(Date.from(dateFormat.parse(state).toInstant().plus(Duration.ofMillis(1)))); + final String startDate = DATE_FORMAT.format(Date.from(DATE_FORMAT.parse(state).toInstant().plus(Duration.ofMillis(1)))); uri = apiUri + "/ClientApi/V1/DeviceReadings?startDate=" + URLEncoder.encode(startDate, StandardCharsets.UTF_8.toString()); } final HttpRequest request = HttpRequest.newBuilder().uri(URI.create(uri)) .header("Authorization", "Bearer " + getAuthToken()).build(); - log.info("pollEvents: request={}", request); + LOGGER.info("pollEvents: request={}", request); final HttpResponse response = client.send(request, BodyHandlers.ofString()); - log.debug("pollEvents: response={}", response); + LOGGER.debug("pollEvents: response={}", response); if (response.statusCode() != 200) { throw new RuntimeException( MessageFormat.format("HTTP server returned status code {0}", response.statusCode())); } - log.trace("pollEvents: body={}", response.body()); + LOGGER.trace("pollEvents: body={}", response.body()); JsonNode jsonNode = mapper.readTree(response.body()); final ArrayNode arrayNode = (ArrayNode) jsonNode; // if no response, empty event list and same state will be returned @@ -132,36 +133,36 @@ public PollResponse pollEvents(String state) throws Exception { maxTime = curReading; } } - state = dateFormat.format(maxTime); + state = DATE_FORMAT.format(maxTime); } - log.debug("pollEvents: New state will be {}", state); + LOGGER.debug("pollEvents: New state will be {}", state); final PollResponse pollResponse = new PollResponse(events, state); - log.debug("pollEvents: pollResponse={}", pollResponse); - log.debug("pollEvents: END"); + LOGGER.debug("pollEvents: pollResponse={}", pollResponse); + LOGGER.debug("pollEvents: END"); return pollResponse; } protected String getAuthToken() throws Exception { - log.debug("getAuthToken: BEGIN"); + LOGGER.debug("getAuthToken: BEGIN"); final HttpClient client = HttpClient.newBuilder().version(Version.HTTP_1_1).build(); final String uri = apiUri + "/api/Auth/authenticate"; final AuthCredentialsDto authCredentialsDto = new AuthCredentialsDto(userName, password); final String requestBody = mapper.writeValueAsString(authCredentialsDto); - log.debug("getAuthToken: requestBody={}", requestBody); + LOGGER.debug("getAuthToken: requestBody={}", requestBody); final HttpRequest request = HttpRequest.newBuilder().uri(URI.create(uri)).timeout(Duration.ofMinutes(1)) .header("Accept", "*/*").header("Content-Type", "application/json") .POST(BodyPublishers.ofString(requestBody)).build(); - log.debug("getAuthToken: request={}", request); + LOGGER.debug("getAuthToken: request={}", request); final HttpResponse response = client.send(request, BodyHandlers.ofString()); - log.debug("getAuthToken: response={}", response); - log.debug("getAuthToken: body={}", response.body()); + LOGGER.debug("getAuthToken: response={}", response); + LOGGER.debug("getAuthToken: body={}", response.body()); if (response.statusCode() != 200) { throw new RuntimeException(MessageFormat.format("HTTP server returned status code {0} for request {1}", response.statusCode(), request)); } final AuthTokenDto authTokenDto = mapper.readValue(response.body(), AuthTokenDto.class); - log.debug("getAuthToken: authTokenDto={}", authTokenDto); - log.debug("getAuthToken: END"); + LOGGER.debug("getAuthToken: authTokenDto={}", authTokenDto); + LOGGER.debug("getAuthToken: END"); return authTokenDto.token; } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkDriver.java index 201fc54e..9c8b64a7 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkDriver.java @@ -70,7 +70,7 @@ public NetworkDriver(DeviceDriverConfig config) { blockingStrategy = UninterruptibleBlockingStrategy.PARKING; } - IntStream.range(0, statisticNames.size()).forEach((i) -> { + IntStream.range(0, statisticNames.size()).forEach( i -> { final String statisticName = statisticNames.get(i); statisticNameToIndex.put(statisticName, i); String fileName = getStatisticFileName(interfaceName, statisticName); @@ -108,7 +108,7 @@ protected String getRemoteAddr() { public NetworkRawData readRawData() throws Exception { bucket.asScheduler().consumeUninterruptibly(1, blockingStrategy); final long timestampNanos = System.currentTimeMillis() * 1000 * 1000; - final List statisticValues = networkStatisticFiles.stream().map((s) -> { + final List statisticValues = networkStatisticFiles.stream().map( s -> { try { s.randomAccessFile.seek(0); return Long.parseLong(s.randomAccessFile.readLine()); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkSamples.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkSamples.java index 464ba671..c3d46790 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkSamples.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/network/NetworkSamples.java @@ -25,9 +25,9 @@ public class NetworkSamples implements Samples { private static final Logger log = LoggerFactory.getLogger(NetworkSamples.class); - private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); static { - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); } // Timestamps as nanoseconds since 1970-01-01. @@ -67,7 +67,7 @@ public long lastTimestamp() { } public void setLastTimestampFormatted() { - lastTimestampFormatted = dateFormat.format(new Date(lastTimestamp() / 1000 / 1000)); + lastTimestampFormatted = DATE_FORMAT.format(new Date(lastTimestamp() / 1000 / 1000)); } @Override diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/DataCollectorService.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/DataCollectorService.java index 6425e92a..9745367d 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/DataCollectorService.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/DataCollectorService.java @@ -20,6 +20,7 @@ /** * Read raw data from a device and send it to the memory queue. + * */ public class DataCollectorService extends AbstractExecutionThreadService { private static final Logger LOGGER = LoggerFactory.getLogger(DataCollectorService.class); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/PersistentQueue.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/PersistentQueue.java index 4fb6efc1..41242d4c 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/PersistentQueue.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/PersistentQueue.java @@ -45,6 +45,9 @@ public class PersistentQueue implements AutoCloseable { private final Semaphore semaphore; /** + * create persistent queue + * @param connection + * @param transactionCoordinator * @param capacity Maximum number of elements that can be queued. */ public PersistentQueue(Connection connection, TransactionCoordinator transactionCoordinator, long capacity) { @@ -63,7 +66,9 @@ public PersistentQueue(Connection connection, TransactionCoordinator transaction } /** + * create a database connection. * @param fileName Name of SQLite database file. + * @return database connection */ public static Connection createDatabase(String fileName) { try { @@ -92,6 +97,7 @@ public Connection getConnection() { /** * Add an element. Blocks until there is enough capacity. + * @param element */ public void add(PersistentQueueElement element) throws SQLException, InterruptedException { if (element.id != 0) { @@ -118,6 +124,9 @@ public void add(PersistentQueueElement element) throws SQLException, Interrupted /** * Adds an element to the queue without committing. Blocks until there is enough * capacity. + * @param element + * @throws SQLException + * @throws InterruptedException */ public void addWithoutCommit(PersistentQueueElement element) throws SQLException, InterruptedException { if (element.id != 0) { @@ -141,6 +150,8 @@ public void addWithoutCommit(PersistentQueueElement element) throws SQLException /** * Retrieve up to limit elements. Does not remove elements. + * @param limit + * @throws SQLException */ public synchronized List peek(long limit) throws SQLException { try (final Statement statement = connection.createStatement(); @@ -160,6 +171,9 @@ public synchronized List peek(long limit) throws SQLExce * Remove elements from the queue. Before this method is called, it is expected * that the elements have been written to Pravega in the Pravega transaction * txnId, flushed, but not committed. + * @param elements + * @param txnId + * @throws SQLException */ public synchronized void remove(List elements, Optional txnId) throws SQLException { if (!elements.isEmpty()) { diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java index 032bbe90..d4642086 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/SimpleDeviceDriver.java @@ -210,11 +210,14 @@ public void close() throws Exception { /** * Reads raw data (byte arrays) from a sensor. + * @throws Exception */ abstract public R readRawData() throws Exception; /** * Decode raw data (byte arrays) and append to Samples. + * @param samples + * @param rawSensorData */ abstract public void decodeRawDataToSamples(S samples, R rawSensorData); @@ -225,6 +228,8 @@ public void close() throws Exception { /** * Serialize Samples to a byte array. This will be written to Pravega as an event. + * @param samples + * @throws Exception */ abstract public byte[] serializeSamples(S samples) throws Exception; @@ -236,6 +241,7 @@ public String getRoutingKey(S samples) { * Get the timestamp that will be sent to Pravega with {@link io.pravega.client.stream.EventStreamWriter#noteTime}. * Generally, this should be the number of nanoseconds since 1970-01-01. * All future events should have a timestamp greater or equal to this value. + * @param samples */ public long getTimestamp(S samples) { return samples.getMaxTimestampNanos(); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java index 11886065..0220802e 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/simple/memoryless/SimpleMemorylessDriver.java @@ -110,6 +110,7 @@ boolean getExactlyOnce() { /** * Reads raw data (byte arrays) from a sensor. + * @throws Exception */ abstract public List readRawData() throws Exception; @@ -121,7 +122,7 @@ boolean getExactlyOnce() { abstract public byte[] getEvent(R rawData); /** - * Get timestamp value sourced from server for the particular Raw data object + * Get timestamp value sourced from server for the particular Raw data object. * * @param rawData : Object to fetch timestamp from * @return timestamp in Unix Epoch format diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/ReadingState.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/ReadingState.java index 080b418a..f2fe2a61 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/ReadingState.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/stateful/ReadingState.java @@ -20,7 +20,7 @@ import java.sql.ResultSet; /** - * Stores and updates last-read timestamp in the persistent state (SQLite database) * + * Stores and updates last-read timestamp in the persistent state (SQLite database). * */ public class ReadingState { private static final Logger LOGGER = LoggerFactory.getLogger(ReadingState.class); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/EventWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/EventWriter.java index 772be9fd..1fa1c1d1 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/EventWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/EventWriter.java @@ -48,13 +48,16 @@ static EventWriter create( /** * @return a transaction ID that can be passed to commit(UUID txnId), even in another process. * The non-transactional implementation will always return Optional.empty(). + * @throws TxnFailedException */ Optional flush() throws TxnFailedException; void commit() throws TxnFailedException; /** + * Commit a transaction. * @param timestamp is the number of nanoseconds since 1970-01-01 + * @throws TxnFailedException */ void commit(long timestamp) throws TxnFailedException; @@ -63,6 +66,7 @@ static EventWriter create( * This method should only be used only when recovering from a failure. * * @param txnId a return value from flush(), even from another process. + * @throws TxnFailedException */ void commit(UUID txnId) throws TxnFailedException; @@ -74,6 +78,7 @@ static EventWriter create( /** * This should called be prior to aborting any transactions to make sure it is not open. + * @param txnId */ Transaction.Status getTransactionStatus(UUID txnId); diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/FileUtils.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/FileUtils.java index f9a11377..c6b63bda 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/FileUtils.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/FileUtils.java @@ -29,11 +29,17 @@ public class FileUtils { private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class); - final static String separator = ","; + static final String SEPARATOR = ","; public static final String FAILED_FILES = "Failed_Files"; public static final String COMPLETED_FILES = "Completed_Files"; /** + * Get directory list. + * @param fileSpec + * @param fileExtension + * @param movedFilesDirectory + * @param minTimeInMillisToUpdateFile + * @throws IOException * @return list of file name and file size in bytes * Handle the below cases * 1. If given file path does not exist then log the message and continue @@ -42,7 +48,7 @@ public class FileUtils { * */ public static List getDirectoryListing(String fileSpec, String fileExtension, Path movedFilesDirectory, long minTimeInMillisToUpdateFile) throws IOException { - String[] directories= fileSpec.split(separator); + String[] directories = fileSpec.split(SEPARATOR); List directoryListing = new ArrayList<>(); for (String directory : directories) { final Path pathSpec = Paths.get(directory); @@ -56,20 +62,27 @@ public static List getDirectoryListing(String fileSpec, Stri } /** - * get all files in directory(including subdirectories) and their respective file size in bytes + * get all files in directory(including subdirectories) and their respective file size in bytes. + * @param pathSpec + * @param fileExtension + * @param directoryListing + * @param movedFilesDirectory + * @param minTimeInMillisToUpdateFile + * @throws IOException */ - protected static void getDirectoryFiles(Path pathSpec, String fileExtension, List directoryListing, Path movedFilesDirectory, long minTimeInMillisToUpdateFile) throws IOException{ + protected static void getDirectoryFiles(Path pathSpec, String fileExtension, List directoryListing, Path movedFilesDirectory, long minTimeInMillisToUpdateFile) throws IOException { DirectoryStream.Filter lastModifiedTimeFilter = getLastModifiedTimeFilter(minTimeInMillisToUpdateFile); - try (DirectoryStream dirStream=Files.newDirectoryStream(pathSpec, lastModifiedTimeFilter)) { + try (DirectoryStream dirStream = Files.newDirectoryStream(pathSpec, lastModifiedTimeFilter)) { for (Path path: dirStream) { - if (Files.isDirectory(path)) //traverse subdirectories + if (Files.isDirectory(path)) { //traverse subdirectories getDirectoryFiles(path, fileExtension, directoryListing, movedFilesDirectory, minTimeInMillisToUpdateFile); - else { + } else { FileNameWithOffset fileEntry = new FileNameWithOffset(path.toAbsolutePath().toString(), path.toFile().length()); - if (isValidFile(fileEntry, fileExtension)) - directoryListing.add(fileEntry); - else //move failed file to different folder + if (isValidFile(fileEntry, fileExtension)) { + directoryListing.add(fileEntry); + } else { //move failed file to different folder moveFailedFile(fileEntry, movedFilesDirectory); + } } } } catch (Exception ex) { @@ -86,12 +99,14 @@ protected static void getDirectoryFiles(Path pathSpec, String fileExtension, Lis /** * The last modified time filter for files older than #{timeBefore} milliseconds from current timestamp. * This filter helps to eliminate the files that are partially written in to lookup directory by external services. + * @param minTimeInMillisToUpdateFile + * @return last modified time filter. */ private static DirectoryStream.Filter getLastModifiedTimeFilter(long minTimeInMillisToUpdateFile) { LOGGER.debug("getLastModifiedTimeFilter: minTimeInMillisToUpdateFile: {}", minTimeInMillisToUpdateFile); return entry -> { BasicFileAttributes attr = Files.readAttributes(entry, BasicFileAttributes.class); - if(attr.isDirectory()) { + if (attr.isDirectory()) { return true; } FileTime fileTime = attr.lastModifiedTime(); @@ -107,9 +122,8 @@ private static DirectoryStream.Filter getLastModifiedTimeFilter(long minTi public static boolean isValidFile(FileNameWithOffset fileEntry, String fileExtension) { if (fileEntry.offset <= 0) { - LOGGER.warn("isValidFile: Empty file {} can not be processed",fileEntry.fileName); - } - // If extension is null, ingest all files + LOGGER.warn("isValidFile: Empty file {} can not be processed", fileEntry.fileName); + } // If extension is null, ingest all files else if (fileExtension.isEmpty() || fileExtension.equals(fileEntry.fileName.substring(fileEntry.fileName.lastIndexOf(".") + 1))) { return true; } else { @@ -136,18 +150,21 @@ public static void moveCompletedFile(FileNameWithOffset fileEntry, Path filesDir * To keep same file name of different directories in completed file directory. * Creating completed file name with _ instead of /, so that it includes all subdirectories. * If the full file name is greater than 255 characters, it will be truncated to 255 characters. + * @param completedFilesDir + * @param fileName + * @return completed file name. */ public static String createCompletedFileName(Path completedFilesDir, String fileName) { - if(fileName==null || fileName.isEmpty() || completedFilesDir==null) { + if (fileName == null || fileName.isEmpty() || completedFilesDir == null) { return fileName; } int validFileNameLength = 255 - completedFilesDir.toString().length(); - if(fileName.length() > validFileNameLength) { - fileName = fileName.substring(fileName.indexOf(File.separator, fileName.length() - validFileNameLength-1)); + if (fileName.length() > validFileNameLength) { + fileName = fileName.substring(fileName.indexOf(File.separator, fileName.length() - validFileNameLength - 1)); } - return fileName.replace(File.separator,"_"); + return fileName.replace(File.separator, "_"); } /* @@ -158,12 +175,11 @@ static void moveFile(Path sourcePath, Path targetPath) throws IOException { //Obtain a lock on file before moving try (FileChannel channel = FileChannel.open(sourcePath, StandardOpenOption.WRITE)) { try (FileLock lock = channel.tryLock()) { - if (lock!=null) { + if (lock != null) { Files.move(sourcePath, targetPath, StandardCopyOption.REPLACE_EXISTING); LOGGER.debug("movedFile: Moved file from {} to {}", sourcePath, targetPath); lock.release(); - } - else{ + } else { LOGGER.warn("Unable to obtain lock on file {} for moving. File is locked by another process.", sourcePath); throw new Exception(); } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/SpinBlockingStrategy.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/SpinBlockingStrategy.java index f5943afd..f31ac7cd 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/SpinBlockingStrategy.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/SpinBlockingStrategy.java @@ -19,6 +19,6 @@ public class SpinBlockingStrategy implements UninterruptibleBlockingStrategy { @Override public void parkUninterruptibly(long nanosToPark) { final long endNanos = System.nanoTime() + nanosToPark; - while (endNanos - System.nanoTime() > 0); + while (endNanos - System.nanoTime() > 0) { } } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateDB.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateDB.java index 3cbb40f9..4150035f 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateDB.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateDB.java @@ -19,9 +19,9 @@ public interface TransactionStateDB { /** - * Add file name and begin offset to PendingFiles table - * - * @param files List of file name with Offset. + * Add file name and begin offset to PendingFiles table. + * @param files List of file name with Offset. + * @throws SQLException * */ public void addPendingFileRecords(List files) throws SQLException; @@ -30,6 +30,7 @@ public interface TransactionStateDB { * Get next file to process. Read the file name with begin offset from PendingFiles table and sequence number from SequenceNumber table. * * @return ((file name, begin offset), sequence number) or null if there is no pending file + * @throws SQLException */ public Pair getNextPendingFileRecord() throws SQLException; @@ -50,7 +51,7 @@ public interface TransactionStateDB { public void addCompletedFileRecord(String fileName, long beginOffset, long endOffset, long newNextSequenceNumber, Optional txnId) throws SQLException; /** - * Delete record from pendingFiles table + * Delete record from pendingFiles table. * * @param fileName file name of pending file * @param beginOffset begin offset from where file read starts @@ -66,28 +67,31 @@ public interface TransactionStateDB { * @param beginOffset begin offset from where file read starts * @param endOffset end offset where reading ends. * @param newNextSequenceNumber next sequence number. + * @throws SQLException * */ public void addCompletedFileRecord(String fileName, long beginOffset, long endOffset, long newNextSequenceNumber) throws SQLException; /** - * Delete record from TransactionsToCommit table + * Delete record from TransactionsToCommit table. * * @param txnId transaction id */ public void deleteTransactionToCommit(Optional txnId); /** - * Get a list of files from completedFiles table + * Get a list of files from completedFiles table. * * @return list of file name and end offset (file size) + * @throws SQLException */ public List getCompletedFileRecords() throws SQLException; /** - * Delete completed file record from completedFiles table for given file name + * Delete completed file record from completedFiles table for given file name. * * @param fileName file name + * @throws SQLException */ public void deleteCompletedFileRecord(String fileName) throws SQLException; diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java index d2fd8195..580fecf7 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionStateSQLiteImpl.java @@ -46,7 +46,7 @@ public void close() throws SQLException { } /** - * Add file name and begin offset to PendingFiles table + * Add file name and begin offset to PendingFiles table. * * @param files List of file name with Offset. * @@ -132,7 +132,7 @@ public void addCompletedFileRecord(String fileName, long beginOffset, long endOf } /** - * Delete record from PendingFiles table + * Delete record from PendingFiles table. * * @param fileName file name of pending file * @param beginOffset begin offset from where file read starts @@ -148,23 +148,23 @@ public void deletePendingFile(String fileName, long beginOffset) throws SQLExcep } } - /** - * Update below details - * 1. Update sequence number into SequenceNumber table - * 2. Add entry into CompletedFiles table for given file name and end offset - * 3. Delete all entry from PendingFiles for given file name offset less than equal to given begin offset value - * @param fileName file name of processed file - * @param beginOffset begin offset from where file read starts - * @param endOffset end offset where reading ends. - * @param newNextSequenceNumber next sequence number. - * - */ - @Override - @VisibleForTesting - public void addCompletedFileRecord(String fileName, long beginOffset, long endOffset, long newNextSequenceNumber) throws SQLException { - addCompletedFileRecord(fileName, beginOffset, endOffset, newNextSequenceNumber, Optional.empty()); - } + * Update below details + * 1. Update sequence number into SequenceNumber table + * 2. Add entry into CompletedFiles table for given file name and end offset + * 3. Delete all entry from PendingFiles for given file name offset less than equal to given begin offset value + * @param fileName file name of processed file + * @param beginOffset begin offset from where file read starts + * @param endOffset end offset where reading ends. + * @param newNextSequenceNumber next sequence number. + * + */ + @Override + @VisibleForTesting + public void addCompletedFileRecord(String fileName, long beginOffset, long endOffset, long newNextSequenceNumber) throws SQLException { + addCompletedFileRecord(fileName, beginOffset, endOffset, newNextSequenceNumber, Optional.empty()); + } + /** * Delete record from TransactionsToCommit table @@ -199,18 +199,18 @@ public List getCompletedFileRecords() throws SQLException { } /** - * Delete completed file record from completedFiles table for given file name + * Delete completed file record from completedFiles table for given file name. * * @param fileName file name */ - @Override - public void deleteCompletedFileRecord(String fileName) throws SQLException { - try (final PreparedStatement deleteStatement = connection.prepareStatement( - "delete from CompletedFiles where fileName = ?"); - final AutoRollback autoRollback = new AutoRollback(connection)) { - deleteStatement.setString(1, fileName); - deleteStatement.execute(); - autoRollback.commit(); - } + @Override + public void deleteCompletedFileRecord(String fileName) throws SQLException { + try (final PreparedStatement deleteStatement = connection.prepareStatement( + "delete from CompletedFiles where fileName = ?"); + final AutoRollback autoRollback = new AutoRollback(connection)) { + deleteStatement.setString(1, fileName); + deleteStatement.execute(); + autoRollback.commit(); } + } } diff --git a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionalEventWriter.java b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionalEventWriter.java index d0018485..224e8ea6 100644 --- a/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionalEventWriter.java +++ b/pravega-sensor-collector/src/main/java/io/pravega/sensor/collector/util/TransactionalEventWriter.java @@ -95,6 +95,7 @@ public Transaction.Status getTransactionStatus() { } return null; } + public Transaction.Status getTransactionStatus(UUID txnId) { return writer.getTxn(txnId).checkStatus(); } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/DeviceDriverConfigTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/DeviceDriverConfigTest.java new file mode 100644 index 00000000..880e4909 --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/DeviceDriverConfigTest.java @@ -0,0 +1,31 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DeviceDriverConfigTest { + + @Test + void withProperty() { + Map properties = Parameters.getProperties(); + DeviceDriverManager driverManager = new DeviceDriverManager(properties); + DeviceDriverConfig driverConfig = new DeviceDriverConfig("test", "PRAVEGA_SENSOR_COLLECTOR_", properties, driverManager); + driverConfig.withProperty("testKey", "testVal"); + assertEquals(driverConfig.getProperties().get("testKey"), "testVal"); + assertEquals(driverConfig.getDeviceDriverManager(), driverManager); + assertTrue(driverConfig.toString().contains("test")); + } +} \ No newline at end of file diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/ParametersTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/ParametersTest.java new file mode 100644 index 00000000..f1b135a0 --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/ParametersTest.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ParametersTest { + + @Test + void getEnvPrefix() { + assertEquals(Parameters.getEnvPrefix(), "PRAVEGA_SENSOR_COLLECTOR_"); + } + + @Test + void testGetProperties() { + Map parameters = Parameters.getProperties(); + List list = new ArrayList<>(); + for (Map.Entry entry: parameters.entrySet()) { + list.add(entry.getKey()); + } + assertTrue(list.contains("DESKTOP_SESSION")); + } +} diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaClientPoolTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaClientPoolTest.java new file mode 100644 index 00000000..aff8e9f7 --- /dev/null +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/PravegaClientPoolTest.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.pravega.sensor.collector; + +import io.pravega.client.ClientConfig; +import io.pravega.client.EventStreamClientFactory; +import org.junit.jupiter.api.Test; + +import java.net.URI; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PravegaClientPoolTest { + + @Test + void testGetClientConfig() { + PravegaClientPool pravegaClientPool = new PravegaClientPool(); + ClientConfig clientConfig = pravegaClientPool.getClientConfig(new PravegaClientConfig(URI.create("tcp://localhost/"), "scopeName")); + assertTrue(clientConfig.toString().contains("localhost")); + } + + @Test + void testGetEventStreamClientFactory() { + PravegaClientPool pravegaClientPool = new PravegaClientPool(); + EventStreamClientFactory clientFactory = pravegaClientPool.getEventStreamClientFactory(new PravegaClientConfig(URI.create("tcp://localhost:12345"), "testScope")); + System.out.println("clientConfig = " + clientFactory.toString()); + } +} \ No newline at end of file diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java index f16fe31b..35de594f 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileIngestServiceTest.java @@ -9,21 +9,5 @@ */ package io.pravega.sensor.collector.file; -import io.pravega.sensor.collector.DeviceDriver; -import io.pravega.sensor.collector.DeviceDriverConfig; -import io.pravega.sensor.collector.DeviceDriverManager; -import io.pravega.sensor.collector.file.rawfile.RawFileIngestService; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Mock; - -import java.sql.SQLException; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; - -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; - public class FileIngestServiceTest { } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java index b7d1afd7..54efc405 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/FileProcessorTests.java @@ -16,7 +16,12 @@ import io.pravega.client.stream.TransactionalEventStreamWriter; import io.pravega.client.stream.TxnFailedException; import io.pravega.sensor.collector.file.rawfile.RawFileProcessor; -import io.pravega.sensor.collector.util.*; +import io.pravega.sensor.collector.util.EventWriter; +import io.pravega.sensor.collector.util.FileNameWithOffset; +import io.pravega.sensor.collector.util.FileUtils; +import io.pravega.sensor.collector.util.TransactionCoordinator; +import io.pravega.sensor.collector.util.TransactionStateSQLiteImpl; +import io.pravega.sensor.collector.util.TransactionalEventWriter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.Assert; import org.junit.jupiter.api.Assertions; @@ -86,7 +91,7 @@ public void getNewFilesTest() { final List expected = ImmutableList.of( new FileNameWithOffset("file3", 0), new FileNameWithOffset("file4", 0)); - RawFileProcessor fileProcessor = new RawFileProcessor(config,state, writer, transactionCoordinator, "writerId"); + RawFileProcessor fileProcessor = new RawFileProcessor(config, state, writer, transactionCoordinator, "writerId"); final List actual = fileProcessor.getNewFiles(directoryListing, completedFiles); Assertions.assertEquals(expected, actual); } @@ -94,7 +99,7 @@ public void getNewFilesTest() { @Test public void getDirectoryListingTest() throws IOException { final List actual = FileUtils.getDirectoryListing( - "../log-file-sample-data/","csv", Paths.get("."), 5000); + "../log-file-sample-data/", "csv", Paths.get("."), 5000); LOGGER.info("actual={}", actual); } @@ -114,7 +119,7 @@ public void getEmptyNextFileSet() throws Exception { @Test public void processNextFile() throws Exception { copyFile(); - FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); doNothing().when(transactionalEventWriter).writeEvent(anyString(), any()); fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L); verify(transactionalEventWriter).writeEvent(anyString(), any()); @@ -133,7 +138,7 @@ public void processNextFewFiles() throws Exception { .thenReturn(new ImmutablePair<>(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub3.parquet", 0), 3L)) .thenAnswer(invocation -> null); - FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); doNothing().when(transactionalEventWriter).writeEvent(anyString(), any()); fileProcessor.processNewFiles(); @@ -149,13 +154,14 @@ public void processNextFewFiles() throws Exception { @Test public void processNextFile_WriteEventException() throws Exception { copyFile(); - FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); Mockito.doThrow(TxnFailedException.class).when(transactionalEventWriter).writeEvent(anyString(), any()); assertThrows(RuntimeException.class, () -> fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L)); // Verify that myMethod was called exactly three times Mockito.verify(transactionalEventWriter, Mockito.times(1)).writeEvent(anyString(), any()); } + /* * Process the single file . * Throw transaction failed exception while commiting transaction @@ -163,7 +169,7 @@ public void processNextFile_WriteEventException() throws Exception { @Test public void processNextFile_CommitException() throws Exception { copyFile(); - FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); Mockito.doThrow(TxnFailedException.class).when(transactionalEventWriter).commit(); assertThrows(RuntimeException.class, () -> fileProcessor.processFile(new FileNameWithOffset("../../pravega-sensor-collector/parquet-file-sample-data/sub1.parquet", 0), 1L)); // Verify that myMethod was called exactly three times diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessorTests.java index 79531a27..e7da9d5f 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/csvfile/CsvFileSequenceProcessorTests.java @@ -28,7 +28,7 @@ public void before() throws Exception { */ @Test public void generateEventForCSVFileTests() throws Exception { - FileProcessor fileProcessor = new CsvFileSequenceProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new CsvFileSequenceProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); fileProcessor.processNewFiles(); Mockito.verify(state, Mockito.times(1)).getNextPendingFileRecord(); } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetEventGeneratorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetEventGeneratorTests.java index 75de0390..9f8dd37c 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetEventGeneratorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetEventGeneratorTests.java @@ -35,9 +35,9 @@ public class ParquetEventGeneratorTests { @Test public void TestFile() throws IOException { - final EventGenerator eventGenerator = ParquetEventGenerator.create("routingKey1",100); - final List files = FileUtils.getDirectoryListing("../parquet-file-sample-data","parquet", Paths.get("."), 5000); - File parquetData= new File(files.get(0).fileName); + final EventGenerator eventGenerator = ParquetEventGenerator.create("routingKey1", 100); + final List files = FileUtils.getDirectoryListing("../parquet-file-sample-data", "parquet", Paths.get("."), 5000); + File parquetData = new File(files.get(0).fileName); final CountingInputStream inputStream = new CountingInputStream(new FileInputStream(parquetData)); final List events = new ArrayList<>(); diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetFileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetFileProcessorTests.java index 80a8a578..b9fa3cb7 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetFileProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/parquet/ParquetFileProcessorTests.java @@ -29,7 +29,7 @@ public void before() throws Exception { */ @Test public void generateEventForParquetTests() throws Exception { - FileProcessor fileProcessor = new ParquetFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new ParquetFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); fileProcessor.processNewFiles(); Mockito.verify(state, Mockito.times(1)).getNextPendingFileRecord(); } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessorTests.java index a1cbe11c..6daeec53 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/file/rawfile/RawFileProcessorTests.java @@ -27,7 +27,7 @@ public void before() throws Exception { */ @Test public void generateEventForRawFileTests() throws Exception { - FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter,transactionCoordinator, "test"); + FileProcessor fileProcessor = new RawFileProcessor(config, state, transactionalEventWriter, transactionCoordinator, "test"); fileProcessor.processNewFiles(); Mockito.verify(state, Mockito.times(1)).getNextPendingFileRecord(); } diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/leap/LeapMockResources.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/leap/LeapMockResources.java index 8e9a4333..2b3fca64 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/leap/LeapMockResources.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/leap/LeapMockResources.java @@ -13,7 +13,11 @@ import org.slf4j.LoggerFactory; import javax.ws.rs.Consumes; -import javax.ws.rs.*; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import org.glassfish.grizzly.http.server.Request; @@ -51,7 +55,7 @@ public String authenticateMethod(@Context Request request, String data) throws E } /** - * Generate readings from time 0:00 UTC of the previous day until the currenttime + * Generate readings from time 0:00 UTC of the previous day until the currenttime. * */ public List getAllReadings() throws Exception { List allReadings = new ArrayList<>(); @@ -80,6 +84,9 @@ public List getAllReadings() throws Exception { /** * Gets readings from getAllReadings and filters out any readings prior to startDate. + * @param request + * @param startDate + * @throws Exception * */ @GET @Path("ClientApi/V1/DeviceReadings") diff --git a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionCoordinatorTests.java b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionCoordinatorTests.java index 0c8258b6..c2631695 100644 --- a/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionCoordinatorTests.java +++ b/pravega-sensor-collector/src/test/java/io/pravega/sensor/collector/util/TransactionCoordinatorTests.java @@ -69,7 +69,7 @@ public void setUp() throws SQLException { when(mockStatement.execute(anyString())).thenReturn(true); /*when(mockConnection.prepareStatement(anyString())).thenReturn(mockPrepareStatement); when(mockPrepareStatement.execute()).thenReturn(true);*/ - transactionProcessor = new TransactionCoordinator(mockConnection,transactionalEventWriter); + transactionProcessor = new TransactionCoordinator(mockConnection, transactionalEventWriter); } @Test @@ -157,13 +157,13 @@ public void testGetTransactionToCommit() throws SQLException { when(mockResultSet.getString("txnId")).thenReturn(UUID.randomUUID().toString(), UUID.randomUUID().toString()); // Get List of transaction ID's from TransactionToCommit table - List uuidList =transactionProcessor.getTransactionsToCommit(); + List uuidList = transactionProcessor.getTransactionsToCommit(); // Assert verify(mockResultSet, times(3)).next(); verify(mockResultSet, times(2)).getString("txnId"); //verify result contains 2 UUIDs - assertEquals(2,uuidList.size()); + assertEquals(2, uuidList.size()); } /* @@ -202,7 +202,7 @@ public void testPerformRecoveryWithCommitFail() throws SQLException, TxnFailedEx // Mock behavior: when statement.executeQuery is called, return the mock result set when(mockStatement.executeQuery("select txnId from TransactionsToCommit")).thenReturn(mockResultSet); // Mock behavior: simulate the result set having two rows with different UUIDs - when(mockResultSet.next()).thenReturn(true, true,false); + when(mockResultSet.next()).thenReturn(true, true, false); when(mockResultSet.getString("txnId")).thenReturn(UUID.randomUUID().toString(), UUID.randomUUID().toString()); //mock for delete transaction call when(mockConnection.prepareStatement(anyString())).thenReturn(mockPrepareStatement); @@ -213,7 +213,7 @@ public void testPerformRecoveryWithCommitFail() throws SQLException, TxnFailedEx //doNothing().when(transactionalEventWriter).commit(any()); // Perform recovery - transactionProcessor.performRecovery(); + transactionProcessor.performRecovery(); // Assert verify(mockResultSet, times(3)).next(); @@ -231,7 +231,7 @@ public void testPerformRecoveryCommitWithUnknownTransactionFail() throws SQLExce // Mock behavior: when statement.executeQuery is called, return the mock result set when(mockStatement.executeQuery("select txnId from TransactionsToCommit")).thenReturn(mockResultSet); // Mock behavior: simulate the result set having two rows with different UUIDs - when(mockResultSet.next()).thenReturn(true,true,false); + when(mockResultSet.next()).thenReturn(true, true, false); when(mockResultSet.getString("txnId")).thenReturn(UUID.randomUUID().toString(), UUID.randomUUID().toString()); //mock for delete transaction call when(mockConnection.prepareStatement(anyString())).thenReturn(mockPrepareStatement); @@ -241,9 +241,9 @@ public void testPerformRecoveryCommitWithUnknownTransactionFail() throws SQLExce }).when(transactionalEventWriter).commit(Mockito.any()); // Perform recovery - transactionProcessor.performRecovery(); + transactionProcessor.performRecovery(); - // Assert + // Assert verify(mockResultSet, times(3)).next(); verify(mockResultSet, times(2)).getString("txnId"); @@ -258,7 +258,7 @@ public void testPerformRecoveryCommitWithOtherException() throws SQLException, T // Mock behavior: when statement.executeQuery is called, return the mock result set when(mockStatement.executeQuery("select txnId from TransactionsToCommit")).thenReturn(mockResultSet); // Mock behavior: simulate the result set having two rows with different UUIDs - when(mockResultSet.next()).thenReturn(true,false); + when(mockResultSet.next()).thenReturn(true, false); when(mockResultSet.getString("txnId")).thenReturn(UUID.randomUUID().toString(), UUID.randomUUID().toString()); //mock for delete transaction call when(mockConnection.prepareStatement(anyString())).thenReturn(mockPrepareStatement); @@ -275,8 +275,6 @@ public void testPerformRecoveryCommitWithOtherException() throws SQLException, T // Assert String expectedMessage = "Other Runtime Exception"; assertEquals(expectedMessage, exception.getMessage(), "Exception message mismatch"); - - } @Test