Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linting improvements #1000

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes,
RootDocumentMigrationContext rootDocumentContext) throws Exception
RootDocumentMigrationContext rootDocumentContext)
throws IOException, InterruptedException, NoWorkLeftException
{
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
confirmShardPrepIsComplete(indexMetadataFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
@Slf4j
public class ParallelDocumentMigrationsTest extends SourceTestBase {

final static List<SearchClusterContainer.ContainerVersion> SOURCE_IMAGES = List.of(
static final List<SearchClusterContainer.ContainerVersion> SOURCE_IMAGES = List.of(
SearchClusterContainer.ES_V7_10_2
);
final static List<SearchClusterContainer.ContainerVersion> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);
static final List<SearchClusterContainer.ContainerVersion> TARGET_IMAGES = List.of(SearchClusterContainer.OS_V2_14_0);

public static Stream<Arguments> makeDocumentMigrationArgs() {
List<Object[]> sourceImageArgs = SOURCE_IMAGES.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
@Slf4j
public class SourceTestBase {
public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest";
public final static int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;
public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024;
public static final String SOURCE_SERVER_ALIAS = "source";
public final static long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;
public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600;

protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) {
return new Object[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
if (l < 0) {
throw new IllegalArgumentException("Seeking to negative position: " + pos);
} else if (l > length) {
throw new EOFException("seek past EOF");
throw new EOFException();

Check warning on line 48 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java#L48

Added line #L48 was not covered by tests
}
pos = (int) l;
}
Expand Down Expand Up @@ -78,15 +78,15 @@
@Override
public byte readByte() throws IOException {
if (pos >= offset + length) {
throw new EOFException("seek past EOF");
throw new EOFException();

Check warning on line 81 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java#L81

Added line #L81 was not covered by tests
}
return bytes[offset + pos++];
}

@Override
public void readBytes(final byte[] b, final int offset, int len) throws IOException {
if (pos + len > this.offset + length) {
throw new EOFException("seek past EOF");
throw new EOFException();

Check warning on line 89 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/ByteArrayIndexInput.java#L89

Added line #L89 was not covered by tests
}
System.arraycopy(bytes, this.offset + pos, b, offset, len);
pos += len;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

@Slf4j
public class InvalidResponse extends RfsException {
private static final Pattern unknownSetting = Pattern.compile("unknown setting \\[(.+?)\\].+");
private static final Pattern UNKNOWN_SETTING = Pattern.compile("unknown setting \\[(.+?)\\].+");
private static final ObjectMapper objectMapper = new ObjectMapper();
private final HttpResponse response;
private final transient HttpResponse response;

public InvalidResponse(String message, HttpResponse response) {
super(message);
Expand All @@ -41,22 +41,18 @@ public Set<String> getIllegalArguments() {
errorBody.map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add);

// Check root cause errors
errorBody.map(node -> node.get("root_cause")).ifPresent(nodes -> {
errorBody.map(node -> node.get("root_cause")).ifPresent(nodes ->
nodes.forEach(
node -> {
Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add);
}
);
});
node -> Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add)
)
);

// Check all suppressed errors
errorBody.map(node -> node.get("suppressed")).ifPresent(nodes -> {
errorBody.map(node -> node.get("suppressed")).ifPresent(nodes ->
nodes.forEach(
node -> {
Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add);
}
);
});
node ->
Optional.of(node).map(InvalidResponse::getUnknownSetting).ifPresent(interimResults::add)
));

var onlyExpectedErrors = interimResults.stream()
.map(Entry::getKey)
Expand All @@ -82,7 +78,7 @@ private static Map.Entry<String, String> getUnknownSetting(JsonNode json) {
}
return Map.entry(typeNode, reasonNode);
}).map(entry -> {
var matcher = unknownSetting.matcher(entry.getValue().asText());
var matcher = UNKNOWN_SETTING.matcher(entry.getValue().asText());
if (!matcher.matches()) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,61 +154,65 @@
}

protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) {
Document document;
try {
Document document = reader.document(docId);
String id = null;
BytesRef sourceBytes = null;
try {
for (var field : document.getFields()) {
String fieldName = field.name();
switch (fieldName) {
case "_id": {
// ES 6+
var idBytes = field.binaryValue();
id = Uid.decodeId(idBytes.bytes);
break;
}
case "_uid": {
// ES 5
id = field.stringValue();
break;
}
case "_source": {
// All versions (?)
sourceBytes = field.binaryValue();
break;
}
}
}
if (id == null) {
log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log();
return null; // Skip documents with missing id
}
document = reader.document(docId);
} catch (IOException e) {
log.atError().setMessage("Failed to read document at Lucene index location {}").addArgument(docId).setCause(e).log();
return null;

Check warning on line 162 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java#L160-L162

Added lines #L160 - L162 were not covered by tests
}

if (sourceBytes == null || sourceBytes.bytes.length == 0) {
log.atWarn().setMessage("Document {} doesn't have the _source field enabled").addArgument(id).log();
return null; // Skip these
String id = null;
BytesRef sourceBytes = null;
try {
for (var field : document.getFields()) {
String fieldName = field.name();
switch (fieldName) {
case "_id": {
// ES 6+
var idBytes = field.binaryValue();
id = Uid.decodeId(idBytes.bytes);
break;
}
case "_uid": {
// ES 5
id = field.stringValue();
break;
}
case "_source": {
// All versions (?)
sourceBytes = field.binaryValue();
break;
}
default:
break;
}

log.atDebug().setMessage("Reading document {}").addArgument(id).log();
} catch (Exception e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
log.atError().setMessage(errorMessage.toString()).setCause(e).log();
return null; // Skip documents with invalid id
}
if (id == null) {
log.atError().setMessage("Document with index" + docId + " does not have an id. Skipping").log();
return null; // Skip documents with missing id
}

if (!isLive) {
log.atDebug().setMessage("Document {} is not live").addArgument(id).log();
return null; // Skip these
if (sourceBytes == null || sourceBytes.bytes.length == 0) {
log.atWarn().setMessage("Document {} doesn't have the _source field enabled").addArgument(id).log();
return null; // Skip these

Check warning on line 198 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java#L197-L198

Added lines #L197 - L198 were not covered by tests
}

log.atDebug().setMessage("Document {} read successfully").addArgument(id).log();
return new RfsLuceneDocument(id, sourceBytes.utf8ToString());
} catch (Exception e) {
log.atError().setMessage("Failed to read document at Lucene index location {}").addArgument(docId).setCause(e).log();
return null;
log.atDebug().setMessage("Reading document {}").addArgument(id).log();
} catch (RuntimeException e) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Unable to parse Document id from Document. The Document's Fields: ");
document.getFields().forEach(f -> errorMessage.append(f.name()).append(", "));
log.atError().setMessage(errorMessage.toString()).setCause(e).log();
return null; // Skip documents with invalid id

Check warning on line 207 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java#L202-L207

Added lines #L202 - L207 were not covered by tests
}

if (!isLive) {
log.atDebug().setMessage("Document {} is not live").addArgument(id).log();
return null; // Skip these

Check warning on line 212 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java#L211-L212

Added lines #L211 - L212 were not covered by tests
}

log.atDebug().setMessage("Document {} read successfully").addArgument(id).log();
return new RfsLuceneDocument(id, sourceBytes.utf8ToString());
}
}
Loading