Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into dlq
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <[email protected]>
  • Loading branch information
peternied committed Aug 13, 2024
2 parents 0c15414 + 73d991a commit f661b34
Show file tree
Hide file tree
Showing 14 changed files with 475 additions and 170 deletions.
2 changes: 1 addition & 1 deletion DocumentsFromSnapshotMigration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ To see the default shard size, use the `--help` CLI option:
| --lucene-dir | The absolute path to the directory where we'll put the Lucene docs |
| --index-allowlist | The list of index names to migrate (e.g. 'logs_2024_01, logs_2024_02') |
| --max-shard-size-bytes | The maximum shard size in bytes to allow when performing the document migration |
| --max-initial-lease-duration | The maximum time for the first attempt to migrate a shard's documents |
| --initial-lease-duration | The time give for the first attempt to migrate a shard's documents before quitting and requiring another process to pick it up. |
| --otel-collector-endpoint | The endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be forwarded |
| --target-host | The target host and port (e.g. http://localhost:9200) |
| --target-username | The username for target cluster authentication |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 2;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

public static class DurationConverter implements IStringConverter<Duration> {
@Override
Expand Down Expand Up @@ -103,11 +105,11 @@ public static class Args {
+ " performing the document migration. Useful for preventing disk overflow. Default: 80 * 1024 * 1024 * 1024 (80 GB)"), required = false)
public long maxShardSizeBytes = 80 * 1024 * 1024 * 1024L;

@Parameter(names = { "--max-initial-lease-duration" }, description = ("Optional. The maximum time that the "
@Parameter(names = { "--initial-lease-duration" }, description = ("Optional. The time that the "
+ "first attempt to migrate a shard's documents should take. If a process takes longer than this "
+ "the process will terminate, allowing another process to attempt the migration, but with double the "
+ "amount of time than the last time. Default: PT10M"), required = false, converter = DurationConverter.class)
public Duration maxInitialLeaseDuration = Duration.ofMinutes(10);
public Duration initialLeaseDuration = Duration.ofMinutes(10);

@Parameter(required = false, names = {
"--otel-collector-endpoint" }, arity = 1, description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be"
Expand Down Expand Up @@ -179,13 +181,15 @@ public static void main(String[] args) throws Exception {
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
ConnectionContext connectionContext = arguments.targetArgs.toConnectionContext();
final var workerId = UUID.randomUUID().toString();
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
UUID.randomUUID().toString()
workerId
);
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
TryHandlePhaseFailure.executeWithTryCatch(() -> {
log.info("Running RfsWorker");
log.info("Running RfsMigrateDocuments with workerId = " + workerId);

OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient, arguments.numDocsPerBulkRequest,
Expand Down Expand Up @@ -216,7 +220,7 @@ public static void main(String[] args) throws Exception {
LuceneDocumentsReader::new,
reindexer,
workCoordinator,
arguments.maxInitialLeaseDuration,
arguments.initialLeaseDuration,
processManager,
indexMetadataFactory,
arguments.snapshotName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private static ProcessBuilder setupProcess(
targetAddress,
"--index-allowlist",
"geonames",
"--max-initial-lease-duration",
"--initial-lease-duration",
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };

// Kick off the doc migration process
Expand Down
4 changes: 2 additions & 2 deletions RFS/src/main/java/com/rfs/cms/AbstractedHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.rfs.cms;

import java.io.IOException;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -25,7 +25,7 @@ interface AbstractHttpResponse {
default String toDiagnosticString() {
String payloadStr;
try {
payloadStr = Arrays.toString(getPayloadBytes());
payloadStr = new String(getPayloadBytes(), StandardCharsets.UTF_8);
} catch (IOException e) {
payloadStr = "[EXCEPTION EVALUATING PAYLOAD]: " + e;
}
Expand Down
Loading

0 comments on commit f661b34

Please sign in to comment.