From 0971f43e648b64f872b31177ee185849a6f772a7 Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Thu, 16 Jan 2025 15:31:15 -0500 Subject: [PATCH] [Transform] Unblock after update (#120144) If an update changes the destination index, optimistically assume the new destination index does not have a write block and try to run the transform. If the new destination index has a write block, the transform will drop the run and move back into a blocked state. Fix #120065 --- .../transforms/TransformConfigUpdate.java | 5 ++ .../transforms/TransformConfigTests.java | 11 ++- .../TransformConfigUpdateTests.java | 14 ++++ .../transform/integration/TransformIT.java | 48 +++++++++++-- .../integration/TransformRestTestCase.java | 2 +- .../TransportUpdateTransformAction.java | 4 +- .../transform/transforms/TransformTask.java | 11 +++ .../transforms/TransformTaskTests.java | 72 +++++++++++++++++++ 8 files changed, 160 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java index 502d403cf979f..42baa1f769b23 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java @@ -239,6 +239,11 @@ public boolean changesHeaders(TransformConfig config) { return isNullOrEqual(headers, config.getHeaders()) == false; } + public boolean changesDestIndex(TransformConfig config) { + var updatedIndex = dest == null ? null : dest.getIndex(); + return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false; + } + private static boolean isNullOrEqual(Object lft, Object rgt) { return lft == null || lft.equals(rgt); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java index 180c9ca1f674d..e6b9e7f75a87d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigTests.java @@ -78,10 +78,19 @@ public static TransformConfig randomTransformConfigWithoutHeaders(String id) { } public static TransformConfig randomTransformConfigWithoutHeaders(String id, PivotConfig pivotConfig, LatestConfig latestConfig) { + return randomTransformConfigWithoutHeaders(id, pivotConfig, latestConfig, randomDestConfig()); + } + + public static TransformConfig randomTransformConfigWithoutHeaders( + String id, + PivotConfig pivotConfig, + LatestConfig latestConfig, + DestConfig destConfig + ) { return new TransformConfig( id, randomSourceConfig(), - randomDestConfig(), + destConfig, randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)), randomBoolean() ? null : randomSyncConfig(), null, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 061078260725b..62020f9992122 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -126,6 +126,20 @@ public void testChangesHeaders() { assertTrue("true update changes headers", update.changesHeaders(config)); } + public void testChangesDestIndex() { + TransformConfig config = randomTransformConfig(); + TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null); + assertFalse("null update does not change destination index", update.changesDestIndex(config)); + + var newDestWithSameIndex = new DestConfig(config.getDestination().getIndex(), null, null); + update = new TransformConfigUpdate(null, newDestWithSameIndex, null, null, null, null, null, null); + assertFalse("equal update does not change destination index", update.changesDestIndex(config)); + + var newDestWithNewIndex = new DestConfig(config.getDestination().getIndex() + "-new", null, null); + update = new TransformConfigUpdate(null, newDestWithNewIndex, null, null, null, null, null, null); + assertTrue("true update changes destination index", update.changesDestIndex(config)); + } + public void testApply() { TransformConfig config = new TransformConfig( "time-transform", diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 84519166eddb6..dc8dfe377a844 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -39,6 +39,7 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -50,6 +51,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; @@ -245,10 +247,8 @@ public void testDestinationIndexBlocked() throws Exception { assertAcknowledged(adminClient().performRequest(request)); // index more docs so the checkpoint tries to run, wait until transform stops - assertBusy(() -> { - indexDoc(42, sourceIndexName); - assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); - }, 30, TimeUnit.SECONDS); + indexDoc(42, sourceIndexName); + assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS); // unblock index request = new Request("PUT", destIndexName + "/_settings"); @@ -266,6 +266,46 @@ public void testDestinationIndexBlocked() throws Exception { deleteTransform(transformId); } + public void testUnblockWithNewDestinationIndex() throws Exception { + var transformId = "transform-continuous-unblock-destination"; + var sourceIndexName = "source-reviews"; + var destIndexName = "destination-reviews-old"; + var newDestIndexName = "destination-reviews-new"; + + // create transform & indices, wait until 1st checkpoint is finished + createReviewsIndex(newDestIndexName, 100, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow); + createContinuousTransform(sourceIndexName, transformId, destIndexName); + + // block destination index + Request request = new Request("PUT", destIndexName + "/_block/write"); + assertAcknowledged(adminClient().performRequest(request)); + + // index more docs so the checkpoint tries to run, wait until transform stops + indexDoc(42, sourceIndexName); + assertBusy(() -> { assertEquals(TransformStats.State.WAITING.value(), getTransformState(transformId)); }, 30, TimeUnit.SECONDS); + + // change destination index + var update = format(""" + { + "description": "updated config", + "dest": { + "index": "%s" + } + } + """, newDestIndexName); + updateConfig(transformId, update, true, RequestOptions.DEFAULT); + + assertBusy(() -> { + assertThat( + getTransformState(transformId), + in(Set.of(TransformStats.State.STARTED.value(), TransformStats.State.INDEXING.value())) + ); + }, 30, TimeUnit.SECONDS); + + stopTransform(transformId); + deleteTransform(transformId); + } + public void testTransformLifecycleInALoop() throws Exception { String transformId = "lifecycle-in-a-loop"; String indexName = transformId + "-src"; diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 4b7e478dbb61d..3ee46e0ff087f 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -437,7 +437,7 @@ protected void createReviewsIndex( Request req = new Request("PUT", indexName); req.setEntity(indexMappings); req.setOptions(RequestOptions.DEFAULT); - assertAcknowledged(adminClient().performRequest(req)); + assertOKAndConsume(adminClient().performRequest(req)); } // create index diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index ecd93ae1ae721..4d5dacde6efcb 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -171,7 +171,8 @@ protected void doExecute(Task task, Request request, ActionListener li boolean updateChangesSettings = update.changesSettings(originalConfig); boolean updateChangesHeaders = update.changesHeaders(originalConfig); - if (updateChangesSettings || updateChangesHeaders) { + boolean updateChangesDestIndex = update.changesDestIndex(originalConfig); + if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) { PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( request.getId(), clusterState @@ -256,6 +257,7 @@ protected void taskOperation( ) { transformTask.applyNewSettings(request.getConfig().getSettings()); transformTask.applyNewAuthState(request.getAuthState()); + transformTask.checkAndResetDestinationIndexBlock(request.getConfig()); listener.onResponse(new Response(request.getConfig())); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 7c9a22aa9fbfe..e530a3db83045 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -35,6 +35,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo.TransformCheckpointingInfoBuilder; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformState; @@ -415,6 +416,16 @@ public void applyNewAuthState(AuthorizationState authState) { } } + public void checkAndResetDestinationIndexBlock(TransformConfig config) { + if (context.isWaitingForIndexToUnblock()) { + var currentIndex = getIndexer() == null ? null : getIndexer().getConfig().getDestination().getIndex(); + var updatedIndex = config.getDestination().getIndex(); + if (updatedIndex.equals(currentIndex) == false) { + context.setIsWaitingForIndexToUnblock(false); + } + } + } + @Override protected void init( PersistentTasksService persistentTasksService, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 67ce09c74e98c..e381659b1e01c 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState; +import org.elasticsearch.xpack.core.transform.transforms.DestConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; @@ -45,6 +46,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.core.transform.transforms.latest.LatestConfigTests; +import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests; import org.elasticsearch.xpack.transform.DefaultTransformExtension; import org.elasticsearch.xpack.transform.TransformNode; import org.elasticsearch.xpack.transform.TransformServices; @@ -553,6 +556,75 @@ public void testDeriveBasicCheckpointingInfoWithNoIndexer() { assertThat(checkpointingInfo, sameInstance(TransformCheckpointingInfo.EMPTY)); } + public void testCheckAndResetDestinationIndexBlock() { + var currentConfig = randomConfigForDestIndex("oldDestination"); + var indexer = mock(ClientTransformIndexer.class); + when(indexer.getConfig()).thenReturn(currentConfig); + + var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor()); + transformTask.initializeIndexer(indexer); + + transformTask.getContext().setIsWaitingForIndexToUnblock(true); + var updatedConfig = randomConfigForDestIndex("newDestination"); + + transformTask.checkAndResetDestinationIndexBlock(updatedConfig); + + assertFalse(transformTask.getContext().isWaitingForIndexToUnblock()); + } + + public void testCheckAndResetDestinationIndexBlock_NoChangeToDest() { + var currentConfig = randomConfigForDestIndex("oldDestination"); + var indexer = mock(ClientTransformIndexer.class); + when(indexer.getConfig()).thenReturn(currentConfig); + + var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor()); + transformTask.initializeIndexer(indexer); + + transformTask.getContext().setIsWaitingForIndexToUnblock(true); + var updatedConfig = randomConfigForDestIndex("oldDestination"); + + transformTask.checkAndResetDestinationIndexBlock(updatedConfig); + + assertTrue(transformTask.getContext().isWaitingForIndexToUnblock()); + } + + public void testCheckAndResetDestinationIndexBlock_NotBlocked() { + var currentConfig = randomConfigForDestIndex("oldDestination"); + var indexer = mock(ClientTransformIndexer.class); + when(indexer.getConfig()).thenReturn(currentConfig); + + var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor()); + transformTask.initializeIndexer(indexer); + + var updatedConfig = randomConfigForDestIndex("newDestination"); + + transformTask.checkAndResetDestinationIndexBlock(updatedConfig); + + assertFalse(transformTask.getContext().isWaitingForIndexToUnblock()); + } + + public void testCheckAndResetDestinationIndexBlock_NullIndexer() { + var currentConfig = randomConfigForDestIndex("oldDestination"); + var transformTask = createTransformTask(currentConfig, MockTransformAuditor.createMockAuditor()); + transformTask.getContext().setIsWaitingForIndexToUnblock(true); + + var updatedConfig = randomConfigForDestIndex("oldDestination"); + + transformTask.checkAndResetDestinationIndexBlock(updatedConfig); + + assertFalse(transformTask.getContext().isWaitingForIndexToUnblock()); + } + + private TransformConfig randomConfigForDestIndex(String indexName) { + var pivotOrLatest = randomBoolean(); + return TransformConfigTests.randomTransformConfigWithoutHeaders( + randomAlphaOfLengthBetween(1, 10), + pivotOrLatest ? null : PivotConfigTests.randomPivotConfig(), + pivotOrLatest ? LatestConfigTests.randomLatestConfig() : null, + new DestConfig(indexName, null, null) + ); + } + private TransformTask createTransformTask(TransformConfig transformConfig, MockTransformAuditor auditor) { var threadPool = mock(ThreadPool.class);