Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Karman 11833 fix spotbugs and checkstyle #59

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
<allow pkg="com.azure.storage.common" />
<allow pkg="com.azure.core.http" />
<allow pkg="reactor.core.publisher" />
<allow pkg="org.eclipse.milo.opcua" />
<allow pkg="io.github.bucket4j" />
<subpackage name="storage.s3">
<allow pkg="software.amazon.awssdk"/>
</subpackage>
Expand Down
5 changes: 5 additions & 0 deletions config/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
<Bug pattern="CT_CONSTRUCTOR_THROW" />
</Match>

<Match>
<Class name="io.pravega.sensor.collector.metrics.MetricPublisher" />
<Bug pattern="CT_CONSTRUCTOR_THROW" />
</Match>

<Match>
<Class name="io.pravega.sensor.collector.file.csvfile.CsvFileSequenceProcessor" />
<Bug pattern="UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR" />
Expand Down
5 changes: 5 additions & 0 deletions config/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,10 @@
<suppress files="[\\/]generated[\\/]" checks="[a-zA-Z0-9]*"/>
<suppress files="\.pem" checks="[a-zA-Z0-9]*"/>
<suppress files="passwd" checks="[a-zA-Z0-9]*"/>
<suppress checks="RegexpHeader" files="io/pravega/sensor/collector/accelerometer/README.md" />
<suppress checks="FileTabCharacter" files="io/pravega/sensor/collector/accelerometer/README.md" />
<suppress checks="RegexpHeader" files="pravega-sensor-collector/src/main/resources/logback.xml" />
<suppress checks="MemberName" files="io/pravega/sensor/collector/leap/AuthTokenDto.java" />

</suppressions>

4 changes: 2 additions & 2 deletions pravega-sensor-collector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ tasks.withType(com.github.spotbugs.snom.SpotBugsTask) {

spotbugs {
toolVersion = spotbugsVersion
ignoreFailures = true
ignoreFailures = false
showProgress = true
effort = 'max'
reportLevel = 'default'
Expand All @@ -167,7 +167,7 @@ spotbugs {
checkstyle {
toolVersion = checkstyleVersion
configFile = file("$rootDir/config/checkstyle.xml")
ignoreFailures = true
ignoreFailures = false
configProperties = [importControlFile: "$rootDir/config/import-control.xml",
suppressionsFile: "$rootDir/config/suppressions.xml"]
checkstyleMain {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
public abstract class DeviceDriver extends AbstractService implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeviceDriver.class);

private final DeviceDriverConfig config;

private static final String CREATE_SCOPE_KEY = "CREATE_SCOPE";

private final DeviceDriverConfig config;

public DeviceDriver(DeviceDriverConfig config) {
this.config = Preconditions.checkNotNull(config, "config");
LOGGER.info("Create Scope: {}", isCreateScope());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class DeviceDriverFactory {

/**
* Instantiate a concrete subclass of DeviceDriver based on key/value properties.
* @param config
* @throws RuntimeException
*/
DeviceDriver create(DeviceDriverConfig config) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ protected void doStop() {
*/
private List<DeviceDriverConfig> configFromProperties(String prefix, String sep, Map<String, String> properties) {
// Find instance names.
final List<String> instanceNames = properties.keySet().stream().flatMap((key) -> {
final List<String> instanceNames = properties.keySet().stream().flatMap(key -> {
if (key.startsWith(prefix) && key.endsWith(sep + CLASS_KEY)) {
return Stream.of(key.substring(prefix.length(), key.length() - CLASS_KEY.length() - sep.length()));
}
return Stream.empty();
}).collect(Collectors.toList());
LOGGER.debug("configFromProperties: instanceNames={}", instanceNames);
// Copy properties with prefix to keys without a prefix.
final List<DeviceDriverConfig> config = instanceNames.stream().map((instanceName) -> {
final List<DeviceDriverConfig> config = instanceNames.stream().map(instanceName -> {
final String className = properties.get(prefix + instanceName + sep + CLASS_KEY);
assert (className != null);
assert className != null;
final Map<String, String> instanceProperties = new HashMap<>();
properties.entrySet().stream().forEach((entry) -> {
properties.entrySet().stream().forEach(entry -> {
final String key = entry.getKey();
final String instancePrefix = prefix + instanceName + sep;
if (key.startsWith(instancePrefix)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ public static Map<String, String> getProperties() {
final String fileName = getPropertiesFileName();
return getProperties(fileName);
}

/**
* Combines properties from:
* 1. properties file (if specified)
* 2. system environment
* Values in the system environment will override values in the properties file.
* It is intended that properties files only be used when developing in an IDE.
* @param fileName fileName to read.
* @throws RuntimeException if unable to load properties file.
*/
public static Map<String, String> getProperties(final String fileName) {
Map<String, String> map = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import java.util.Objects;

public class PravegaClientConfig {
private final URI controllerURI;
private final String scopeName;

private static final String PRAVEGA_CONTROLLER_URI_KEY = "PRAVEGA_CONTROLLER_URI";
private final URI controllerURI;
private final String scopeName;

public PravegaClientConfig(URI controllerURI, String scopeName) {
this.controllerURI = Preconditions.checkNotNull(controllerURI, "controllerURI");
Expand Down Expand Up @@ -49,8 +49,12 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PravegaClientConfig that = (PravegaClientConfig) o;
return controllerURI.equals(that.controllerURI)
&& scopeName.equals(that.scopeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
import java.util.function.Consumer;

/**
*The EventGenerator is responsible for generating events depending on file type
*The EventGenerator is responsible for generating events depending on file type.
*/
public interface EventGenerator {

/*
* Generate events from Input stream.
* Depending on file type event generation logic defers
* @param inputStream
* @param firstSequenceNumber
* @return next sequence number, end offset
* */
/**
* Generate events from Input stream.
* Depending on file type event generation logic defers
*
* @param inputStream
* @param firstSequenceNumber
* @param consumer
* @return next sequence number, end offset
* @throws IOException
*/
Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer<PravegaWriterEvent> consumer) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public FileIngestService(DeviceDriverConfig config) {
String getFileSpec() {
return getProperty(FILE_SPEC_KEY);
}

String getFileExtension() {
return getProperty(FILE_EXT, "");
}
Expand Down Expand Up @@ -161,6 +162,7 @@ protected void watchFiles() {
}
LOG.trace("watchFiles: END");
}

protected void processFiles() {
LOG.trace("processFiles: BEGIN");
try {
Expand Down Expand Up @@ -205,7 +207,6 @@ protected void doStart() {
1,
TimeUnit.MILLISECONDS);


deleteFileTask = executor.scheduleAtFixedRate(
this::deleteCompletedFiles,
1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package io.pravega.sensor.collector.file;
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
Expand All @@ -8,6 +7,7 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.sensor.collector.file;

import com.google.common.io.CountingInputStream;
import io.pravega.client.EventStreamClientFactory;
Expand Down Expand Up @@ -109,9 +109,11 @@ public static FileProcessor create(
* @return eventGenerator
*/
public abstract EventGenerator getEventGenerator(FileConfig config);

public void watchFiles() throws Exception {
findAndRecordNewFiles();
}

public void processFiles() throws Exception {
log.debug("processFiles: BEGIN");
if (config.enableDeleteCompletedFiles) {
Expand Down Expand Up @@ -143,7 +145,9 @@ protected void findAndRecordNewFiles() throws Exception {
}

/**
* Returns list of file name and file size in bytes.
* @return list of file name and file size in bytes
* @throws IOException
*/
protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
log.debug("getDirectoryListing: fileSpec={}", config.fileSpec);
Expand All @@ -155,6 +159,9 @@ protected List<FileNameWithOffset> getDirectoryListing() throws IOException {
}

/**
* Returns sorted list of file name and file size in bytes.
* @param directoryListing
* @param completedFiles
* @return sorted list of file name and file size in bytes
*/
protected List<FileNameWithOffset> getNewFiles(List<FileNameWithOffset> directoryListing, List<FileNameWithOffset> completedFiles) {
Expand All @@ -171,7 +178,7 @@ protected List<FileNameWithOffset> getNewFiles(List<FileNameWithOffset> director
FileUtils.moveCompletedFile(dirFile, movedFilesDirectory);
log.warn("File: {} already marked as completed, moving now", dirFile.fileName);
} catch (IOException e) {
log.error("File: {} already marked as completed, but failed to move, error:{}", dirFile.fileName,e.getMessage());
log.error("File: {} already marked as completed, but failed to move, error:{}", dirFile.fileName, e.getMessage());
}
}
});
Expand All @@ -193,7 +200,7 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
*/
if (config.exactlyOnce) {
log.debug("processFile: Transaction status {} ", writer.getTransactionStatus());
if (writer.getTransactionStatus() == Transaction.Status.OPEN){
if (writer.getTransactionStatus() == Transaction.Status.OPEN) {
writer.abort();
}
} else {
Expand All @@ -219,8 +226,8 @@ void processFile(FileNameWithOffset fileNameWithBeginOffset, long firstSequenceN
} catch (TxnFailedException ex) {
log.error("processFile: Write event to transaction failed with exception {} while processing file: {}, event: {}", ex, fileNameWithBeginOffset.fileName, e);

/* TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again?
This will occur only if Transaction state is not open*/
// TODO while writing event if we get Transaction failed exception then should we abort the trasaction and process again?
// This will occur only if Transaction state is not open

throw new RuntimeException(ex);
}
Expand Down Expand Up @@ -273,7 +280,7 @@ void deleteCompletedFiles() throws Exception {
Path filePath = completedFilesPath.resolve(completedFileName);
log.debug("deleteCompletedFiles: Deleting File default name:{}, and it's completed file name:{}.", file.fileName, filePath);
try {
/**
/*
* If file gets deleted from completed files directory, or it does not exist in default ingestion directory
* then only remove the record from DB.
*/
Expand All @@ -282,7 +289,7 @@ void deleteCompletedFiles() throws Exception {
MetricsStore.getMetric(MetricNames.PSC_FILES_DELETED_GAUGE).incrementBy(1L);
log.debug("deleteCompletedFiles: Deleted File default name:{}, and it's completed file name:{}.", file.fileName, filePath);
} else {
/**
/*
* This situation occurs because at first attempt moving file to completed directory fails, but the file still exists in default ingestion directory.
* Moving file from default directory to completed directory will be taken care in next iteration, post which delete will be taken care.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.function.Consumer;

/**
* Generate Event from CSV file
* Generate Event from CSV file.
*/
public class CsvFileEventGenerator implements EventGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(CsvFileEventGenerator.class);
Expand Down Expand Up @@ -66,17 +66,19 @@ public static CsvFileEventGenerator create(String routingKey, int maxRecordsPerE
/** Generate event from input stream. number of records in one event is defined in input config file
* @param inputStream
* @param firstSequenceNumber
* @param consumer
* @return next sequence number, end offset
* @throws IOException
*/
public Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer<PravegaWriterEvent> consumer) throws IOException {
final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader();
final CSVParser parser = CSVParser.parse(inputStream, StandardCharsets.UTF_8, format);
long nextSequenceNumber = firstSequenceNumber;
int numRecordsInEvent = 0;
List<HashMap<String,Object>> eventBatch = new ArrayList<>();
List<HashMap<String, Object>> eventBatch = new ArrayList<>();
for (CSVRecord record : parser) {
HashMap<String,Object> recordDataMap = new HashMap<String,Object>();
for (int i=0; i<record.size();i++) {
HashMap<String, Object> recordDataMap = new HashMap<String, Object>();
for (int i = 0; i < record.size(); i++) {
recordDataMap.put(parser.getHeaderNames().get(i), convertValue(record.get(i)));
}
eventBatch.add(recordDataMap);
Expand All @@ -101,10 +103,10 @@ public Object convertValue(String s) {
// TODO: convert timestamp
try {
return Long.parseLong(s);
} catch (NumberFormatException ignored) {}
} catch (NumberFormatException ignored) { }
try {
return Double.parseDouble(s);
} catch (NumberFormatException ignored) {}
} catch (NumberFormatException ignored) { }
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public CsvFileSequenceProcessor(FileConfig config, TransactionStateDB state, Eve
}

/**
* Event generator for CSV file
* Event generator for CSV file.
* @param config configurations parameters
* @return eventGenerator
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
Expand Down Expand Up @@ -48,7 +47,7 @@
import java.util.stream.Collectors;

/**
* Generate Event from Parquet file
* Generate Event from Parquet file.
*/
public class ParquetEventGenerator implements EventGenerator {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetEventGenerator.class);
Expand Down Expand Up @@ -88,7 +87,9 @@ public static ParquetEventGenerator create(String routingKey, int maxRecordsPerE
*
* @param inputStream
* @param firstSequenceNumber
* @param consumer
* @return next sequence number, end offset
* @throws IOException
*/
public Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputStream, long firstSequenceNumber, Consumer<PravegaWriterEvent> consumer) throws IOException {
File tempFile = File.createTempFile("temp", ".parquet");
Expand All @@ -100,10 +101,10 @@ public Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputS
long timestamp1 = System.nanoTime();

Configuration conf = new Configuration();
MessageType schema = ParquetFileReader.readFooter(HadoopInputFile.fromPath(tempFilePath, conf),ParquetMetadataConverter.NO_FILTER).getFileMetaData().getSchema();
MessageType schema = ParquetFileReader.readFooter(HadoopInputFile.fromPath(tempFilePath, conf), ParquetMetadataConverter.NO_FILTER).getFileMetaData().getSchema();
Schema avroSchema;

if(!parquetSchemas.containsKey(schema)){
if (!parquetSchemas.containsKey(schema)) {
//Modifying field names in extracted schema (removing special characters)
List<Type> fields = schema.getFields().stream()
.map(field -> new PrimitiveType(field.getRepetition(),
Expand All @@ -121,8 +122,7 @@ public Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputS
}

parquetSchemas.put(schema, avroSchema);
}
else{
} else {
LOGGER.debug("Retrieving cached schema");
avroSchema = parquetSchemas.get(schema);
}
Expand All @@ -144,8 +144,7 @@ public Pair<Long, Long> generateEventsFromInputStream(CountingInputStream inputS
for (Schema.Field field : record.getSchema().getFields()) {
String key = field.name();
Object value = record.get(key);
dataMap.put(key,value);

dataMap.put(key, value);
}
eventBatch.add(dataMap);
numRecordsInEvent++;
Expand Down
Loading
Loading