Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clients): cleanup after replaceAllObjects failure #3824

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,37 +494,44 @@ public async Task<ReplaceAllObjectsResponse> ReplaceAllObjectsAsync<T>(string in
var rnd = new Random();
var tmpIndexName = $"{indexName}_tmp_{rnd.Next(100)}";

var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
try
{
var copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);

var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, true, batchSize,
options, cancellationToken).ConfigureAwait(false);
var batchResponse = await ChunkedBatchAsync(tmpIndexName, objects, Action.AddObject, true, batchSize,
options, cancellationToken).ConfigureAwait(false);

await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
copyResponse = await OperationIndexAsync(indexName,
new OperationIndexParams(OperationType.Copy, tmpIndexName)
{ Scope = [ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms] }, options, cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, copyResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);
var moveResponse = await OperationIndexAsync(tmpIndexName,
new OperationIndexParams(OperationType.Move, indexName), options, cancellationToken)
.ConfigureAwait(false);

await WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);
await WaitForTaskAsync(tmpIndexName, moveResponse.TaskID, requestOptions: options, ct: cancellationToken)
.ConfigureAwait(false);

return new ReplaceAllObjectsResponse
return new ReplaceAllObjectsResponse
{
CopyOperationResponse = copyResponse,
MoveOperationResponse = moveResponse,
BatchResponses = batchResponse
};
}
finally
{
CopyOperationResponse = copyResponse,
MoveOperationResponse = moveResponse,
BatchResponses = batchResponse
};
await DeleteIndexAsync(tmpIndexName, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,46 +472,50 @@ public suspend fun SearchClient.replaceAllObjects(
): ReplaceAllObjectsResponse {
val tmpIndexName = "${indexName}_tmp_${Random.nextInt(from = 0, until = 100)}"

var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
try {
var copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)

val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)
val batchResponses = this.chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTask = true,
batchSize = batchSize,
requestOptions = requestOptions,
)

waitForTask(indexName = tmpIndexName, taskID = copy.taskID)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)

copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)
copy = operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = listOf(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms),
),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = copy.taskID)

val move = operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = move.taskID)
val move = operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions,
)
waitForTask(indexName = tmpIndexName, taskID = move.taskID)

return ReplaceAllObjectsResponse(copy, batchResponses, move)
return ReplaceAllObjectsResponse(copy, batchResponses, move)
} finally {
deleteIndex(tmpIndexName)
}
}

/**
Expand Down Expand Up @@ -542,6 +546,13 @@ public fun securedApiKeyRemainingValidity(apiKey: String): Duration {
return validUntil - Clock.System.now()
}

/**
* Checks that an index exists.
*
* @param indexName The name of the index to check.
* @return true if the index exists, false otherwise.
* @throws AlgoliaApiException if an error occurs during the request.
*/
public suspend fun SearchClient.indexExists(indexName: String): Boolean {
try {
getSettings(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,50 +366,54 @@ package object extension {
)(implicit ec: ExecutionContext): Future[ReplaceAllObjectsResponse] = {
val tmpIndexName = s"${indexName}_tmp_${scala.util.Random.nextInt(100)}"

for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
try {
for {
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)

batchResponses <- chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)
batchResponses <- chunkedBatch(
indexName = tmpIndexName,
objects = objects,
action = Action.AddObject,
waitForTasks = true,
batchSize = batchSize,
requestOptions = requestOptions
)

_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)
copy <- client.operationIndex(
indexName = indexName,
operationIndexParams = OperationIndexParams(
operation = OperationType.Copy,
destination = tmpIndexName,
scope = Some(Seq(ScopeType.Settings, ScopeType.Rules, ScopeType.Synonyms))
),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = copy.taskID, requestOptions = requestOptions)

move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
move <- client.operationIndex(
indexName = tmpIndexName,
operationIndexParams = OperationIndexParams(operation = OperationType.Move, destination = indexName),
requestOptions = requestOptions
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
_ <- client.waitTask(indexName = tmpIndexName, taskID = move.taskID, requestOptions = requestOptions)
} yield ReplaceAllObjectsResponse(
copyOperationResponse = copy,
batchResponses = batchResponses,
moveOperationResponse = move
)
} finally {
client.deleteIndex(tmpIndexName)
}
}

/** Check if an index exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,51 +559,57 @@ public extension SearchClient {
) async throws -> ReplaceAllObjectsResponse {
let tmpIndexName = "\(indexName)_tmp_\(Int.random(in: 1_000_000 ..< 10_000_000))"

var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
do {
var copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)

let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)
let batchResponses = try await self.chunkedBatch(
indexName: tmpIndexName,
objects: objects,
waitForTasks: true,
batchSize: batchSize,
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
operation: .move,
destination: indexName
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: moveOperationResponse.taskID)
copyOperationResponse = try await operationIndex(
indexName: indexName,
operationIndexParams: OperationIndexParams(
operation: .copy,
destination: tmpIndexName,
scope: [.settings, .rules, .synonyms]
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: copyOperationResponse.taskID)

return ReplaceAllObjectsResponse(
copyOperationResponse: copyOperationResponse,
batchResponses: batchResponses,
moveOperationResponse: moveOperationResponse
)
let moveOperationResponse = try await self.operationIndex(
indexName: tmpIndexName,
operationIndexParams: OperationIndexParams(
operation: .move,
destination: indexName
),
requestOptions: requestOptions
)
try await self.waitForTask(indexName: tmpIndexName, taskID: moveOperationResponse.taskID)

return ReplaceAllObjectsResponse(
copyOperationResponse: copyOperationResponse,
batchResponses: batchResponses,
moveOperationResponse: moveOperationResponse
)
} catch {
_ = try? await deleteIndex(indexName: tmpIndexName)

throw error
}
}

/// Generate a secured API key
Expand Down
2 changes: 2 additions & 0 deletions scripts/cts/runCts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { printBenchmarkReport } from './testServer/benchmark.js';
import { assertChunkWrapperValid } from './testServer/chunkWrapper.js';
import { startTestServer } from './testServer/index.js';
import { assertValidReplaceAllObjects } from './testServer/replaceAllObjects.js';
import { assertValidReplaceAllObjectsFailed } from './testServer/replaceAllObjectsFailed.js';
import { assertValidTimeouts } from './testServer/timeout.js';
import { assertValidWaitForApiKey } from './testServer/waitFor.js';

Expand Down Expand Up @@ -152,6 +153,7 @@ export async function runCts(
assertValidTimeouts(languages.length);
assertChunkWrapperValid(languages.length - skip('dart') - skip('scala'));
assertValidReplaceAllObjects(languages.length - skip('dart') - skip('scala'));
assertValidReplaceAllObjectsFailed(languages.length - skip('dart') - skip('scala'));
assertValidWaitForApiKey(languages.length - skip('dart') - skip('scala'));
}
if (withBenchmarkServer) {
Expand Down
2 changes: 2 additions & 0 deletions scripts/cts/testServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { benchmarkServer } from './benchmark.js';
import { chunkWrapperServer } from './chunkWrapper.js';
import { gzipServer } from './gzip.js';
import { replaceAllObjectsServer } from './replaceAllObjects.js';
import { replaceAllObjectsServerFailed } from './replaceAllObjectsFailed.js';
import { timeoutServer } from './timeout.js';
import { timeoutServerBis } from './timeoutBis.js';
import { waitForApiKeyServer } from './waitFor.js';
Expand All @@ -23,6 +24,7 @@ export async function startTestServer(suites: Record<CTSType, boolean>): Promise
gzipServer(),
timeoutServerBis(),
replaceAllObjectsServer(),
replaceAllObjectsServerFailed(),
chunkWrapperServer(),
waitForApiKeyServer(),
apiKeyServer(),
Expand Down
Loading
Loading