Skip to content

Commit

Permalink
[Transform] Unblock after update (elastic#120144)
Browse files Browse the repository at this point in the history
If an update changes the destination index, optimistically assume the
new destination index does not have a write block and try to run the
transform.  If the new destination index has a write block, the
transform will drop the run and move back into a blocked state.

Fix elastic#120065
  • Loading branch information
prwhelan authored Jan 16, 2025
1 parent 1c13465 commit 0971f43
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ public boolean changesHeaders(TransformConfig config) {
return isNullOrEqual(headers, config.getHeaders()) == false;
}

public boolean changesDestIndex(TransformConfig config) {
var updatedIndex = dest == null ? null : dest.getIndex();
return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false;
}

private static boolean isNullOrEqual(Object lft, Object rgt) {
return lft == null || lft.equals(rgt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,19 @@ public static TransformConfig randomTransformConfigWithoutHeaders(String id) {
}

public static TransformConfig randomTransformConfigWithoutHeaders(String id, PivotConfig pivotConfig, LatestConfig latestConfig) {
return randomTransformConfigWithoutHeaders(id, pivotConfig, latestConfig, randomDestConfig());
}

public static TransformConfig randomTransformConfigWithoutHeaders(
String id,
PivotConfig pivotConfig,
LatestConfig latestConfig,
DestConfig destConfig
) {
return new TransformConfig(
id,
randomSourceConfig(),
randomDestConfig(),
destConfig,
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomBoolean() ? null : randomSyncConfig(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ public void testChangesHeaders() {
assertTrue("true update changes headers", update.changesHeaders(config));
}

public void testChangesDestIndex() {
TransformConfig config = randomTransformConfig();
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null);
assertFalse("null update does not change destination index", update.changesDestIndex(config));

var newDestWithSameIndex = new DestConfig(config.getDestination().getIndex(), null, null);
update = new TransformConfigUpdate(null, newDestWithSameIndex, null, null, null, null, null, null);
assertFalse("equal update does not change destination index", update.changesDestIndex(config));

var newDestWithNewIndex = new DestConfig(config.getDestination().getIndex() + "-new", null, null);
update = new TransformConfigUpdate(null, newDestWithNewIndex, null, null, null, null, null, null);
assertTrue("true update changes destination index", update.changesDestIndex(config));
}

public void testApply() {
TransformConfig config = new TransformConfig(
"time-transform",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -50,6 +51,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -245,10 +247,8 @@ public void testDestinationIndexBlocked() throws Exception {
assertAcknowledged(adminClient().performRequest(request));

// index more docs so the checkpoint tries to run, wait until transform stops
assertBusy(() -> {
indexDoc(42, sourceIndexName);
assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId));
}, 30, TimeUnit.SECONDS);
indexDoc(42, sourceIndexName);
assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS);

// unblock index
request = new Request("PUT", destIndexName + "/_settings");
Expand All @@ -266,6 +266,46 @@ public void testDestinationIndexBlocked() throws Exception {
deleteTransform(transformId);
}

public void testUnblockWithNewDestinationIndex() throws Exception {
var transformId = "transform-continuous-unblock-destination";
var sourceIndexName = "source-reviews";
var destIndexName = "destination-reviews-old";
var newDestIndexName = "destination-reviews-new";

// create transform & indices, wait until 1st checkpoint is finished
createReviewsIndex(newDestIndexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
createContinuousTransform(sourceIndexName, transformId, destIndexName);

// block destination index
Request request = new Request("PUT", destIndexName + "/_block/write");
assertAcknowledged(adminClient().performRequest(request));

// index more docs so the checkpoint tries to run, wait until transform stops
indexDoc(42, sourceIndexName);
assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS);

// change destination index
var update = format("""
{
"description": "updated config",
"dest": {
"index": "%s"
}
}
""", newDestIndexName);
updateConfig(transformId, update, true, RequestOptions.DEFAULT);

assertBusy(() -> {
assertThat(
getTransformState(transformId),
in(Set.of(TransformStats.State.STARTED.value(), TransformStats.State.INDEXING.value()))
);
}, 30, TimeUnit.SECONDS);

stopTransform(transformId);
deleteTransform(transformId);
}

public void testTransformLifecycleInALoop() throws Exception {
String transformId = "lifecycle-in-a-loop";
String indexName = transformId + "-src";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ protected void createReviewsIndex(
Request req = new Request("PUT", indexName);
req.setEntity(indexMappings);
req.setOptions(RequestOptions.DEFAULT);
assertAcknowledged(adminClient().performRequest(req));
assertOKAndConsume(adminClient().performRequest(req));
}

// create index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li

boolean updateChangesSettings = update.changesSettings(originalConfig);
boolean updateChangesHeaders = update.changesHeaders(originalConfig);
if (updateChangesSettings || updateChangesHeaders) {
boolean updateChangesDestIndex = update.changesDestIndex(originalConfig);
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) {
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
request.getId(),
clusterState
Expand Down Expand Up @@ -256,6 +257,7 @@ protected void taskOperation(
) {
transformTask.applyNewSettings(request.getConfig().getSettings());
transformTask.applyNewAuthState(request.getAuthState());
transformTask.checkAndResetDestinationIndexBlock(request.getConfig());
listener.onResponse(new Response(request.getConfig()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
Expand Down Expand Up @@ -415,6 +416,16 @@ public void applyNewAuthState(AuthorizationState authState) {
}
}

public void checkAndResetDestinationIndexBlock(TransformConfig config) {
if (context.isWaitingForIndexToUnblock()) {
var currentIndex = getIndexer() == null ? null : getIndexer().getConfig().getDestination().getIndex();
var updatedIndex = config.getDestination().getIndex();
if (updatedIndex.equals(currentIndex) == false) {
context.setIsWaitingForIndexToUnblock(false);
}
}
}

@Override
protected void init(
PersistentTasksService persistentTasksService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformConfigVersion;
import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState;
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand All @@ -45,6 +46,8 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.xpack.transform.DefaultTransformExtension;
import org.elasticsearch.xpack.transform.TransformNode;
import org.elasticsearch.xpack.transform.TransformServices;
Expand Down Expand Up @@ -553,6 +556,75 @@ public void testDeriveBasicCheckpointingInfoWithNoIndexer() {
assertThat(checkpointingInfo, sameInstance(TransformCheckpointingInfo.EMPTY));
}

public void testCheckAndResetDestinationIndexBlock() {
var currentConfig = randomConfigForDestIndex("oldDestination");
var indexer = mock(ClientTransformIndexer.class);
when(indexer.getConfig()).thenReturn(currentConfig);

var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
transformTask.initializeIndexer(indexer);

transformTask.getContext().setIsWaitingForIndexToUnblock(true);
var updatedConfig = randomConfigForDestIndex("newDestination");

transformTask.checkAndResetDestinationIndexBlock(updatedConfig);

assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
}

public void testCheckAndResetDestinationIndexBlock_NoChangeToDest() {
var currentConfig = randomConfigForDestIndex("oldDestination");
var indexer = mock(ClientTransformIndexer.class);
when(indexer.getConfig()).thenReturn(currentConfig);

var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
transformTask.initializeIndexer(indexer);

transformTask.getContext().setIsWaitingForIndexToUnblock(true);
var updatedConfig = randomConfigForDestIndex("oldDestination");

transformTask.checkAndResetDestinationIndexBlock(updatedConfig);

assertTrue(transformTask.getContext().isWaitingForIndexToUnblock());
}

public void testCheckAndResetDestinationIndexBlock_NotBlocked() {
var currentConfig = randomConfigForDestIndex("oldDestination");
var indexer = mock(ClientTransformIndexer.class);
when(indexer.getConfig()).thenReturn(currentConfig);

var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
transformTask.initializeIndexer(indexer);

var updatedConfig = randomConfigForDestIndex("newDestination");

transformTask.checkAndResetDestinationIndexBlock(updatedConfig);

assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
}

public void testCheckAndResetDestinationIndexBlock_NullIndexer() {
var currentConfig = randomConfigForDestIndex("oldDestination");
var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor());
transformTask.getContext().setIsWaitingForIndexToUnblock(true);

var updatedConfig = randomConfigForDestIndex("oldDestination");

transformTask.checkAndResetDestinationIndexBlock(updatedConfig);

assertFalse(transformTask.getContext().isWaitingForIndexToUnblock());
}

private TransformConfig randomConfigForDestIndex(String indexName) {
var pivotOrLatest = randomBoolean();
return TransformConfigTests.randomTransformConfigWithoutHeaders(
randomAlphaOfLengthBetween(1, 10),
pivotOrLatest ? null : PivotConfigTests.randomPivotConfig(),
pivotOrLatest ? LatestConfigTests.randomLatestConfig() : null,
new DestConfig(indexName, null, null)
);
}

private TransformTask createTransformTask(TransformConfig transformConfig, MockTransformAuditor auditor) {
var threadPool = mock(ThreadPool.class);

Expand Down

0 comments on commit 0971f43

Please sign in to comment.