Skip to content

Commit

Permalink
RavenDB-20788 : DeleteBucketAsync : before starting cleanup, wait for…
Browse files Browse the repository at this point in the history
… DestinationMigrationConfirm command to be applied on all orchestrator nodes
  • Loading branch information
aviv86 committed Jul 16, 2023
1 parent 5908b0b commit 5265cb8
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions src/Raven.Server/Documents/Sharding/ShardedDocumentDatabase.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using Raven.Client.Http;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Sharding;
using Raven.Server.Config;
using Raven.Server.Documents.Commands;
using Raven.Server.Documents.Indexes;
using Raven.Server.Documents.Indexes.Sharding;
using Raven.Server.Documents.Replication;
Expand All @@ -16,6 +20,7 @@
using Raven.Server.ServerWide.Context;
using Raven.Server.Utils;
using Sparrow;
using Sparrow.Json;
using Sparrow.Utils;

namespace Raven.Server.Documents.Sharding;
Expand Down Expand Up @@ -139,8 +144,11 @@ public void HandleReshardingChanges()
continue;
}

Debug.Assert(process.ConfirmationIndex.HasValue, $"invalid ShardBucketMigration for bucket '{process.Bucket}', " +
"got Status = OwnershipTransferred but no ConfirmationIndex");

// cleanup values
t = DeleteBucketAsync(process.Bucket, process.MigrationIndex, process.LastSourceChangeVector);
t = DeleteBucketAsync(process.Bucket, process.MigrationIndex, process.ConfirmationIndex.Value, process.LastSourceChangeVector);

t.ContinueWith(__ => DocumentsMigrator.ExecuteMoveDocumentsAsync());
}
Expand Down Expand Up @@ -195,14 +203,18 @@ public override void Dispose()
}
}

public async Task DeleteBucketAsync(int bucket, long migrationIndex, string uptoChangeVector)
public async Task DeleteBucketAsync(int bucket, long migrationIndex, long confirmationIndex, string uptoChangeVector)
{
if (string.IsNullOrEmpty(uptoChangeVector))
{
await ServerStore.Sharding.SourceMigrationCleanup(ShardedDatabaseName, bucket, migrationIndex);
return;
}

// before starting cleanup, wait for DestinationMigrationConfirm command
// to be applied in all orchestrator nodes
await WaitForConfirmationIndex(confirmationIndex);

while (true)
{
var cmd = new DeleteBucketCommand(this, bucket, uptoChangeVector);
Expand All @@ -226,6 +238,30 @@ public async Task DeleteBucketAsync(int bucket, long migrationIndex, string upto
}
}

private async Task WaitForConfirmationIndex(long confirmationIndex)
{
var cmd = new WaitForIndexNotificationCommand(new List<long> { confirmationIndex });
var tasks = new List<Task>(ShardingConfiguration.Orchestrator.Topology.Members.Count);
var clusterTopology = ServerStore.GetClusterTopology();

using (ServerStore.ContextPool.AllocateOperationContext(out JsonOperationContext context))
{
foreach (var nodeTag in ShardingConfiguration.Orchestrator.Topology.Members)
{
var chosenNode = new ServerNode
{
ClusterTag = nodeTag,
Database = ShardedDatabaseName,
ServerRole = ServerNode.Role.Member,
Url = clusterTopology.GetUrlFromTag(nodeTag)
};
tasks.Add(ServerStore.ClusterRequestExecutor.ExecuteAsync(chosenNode, nodeIndex: null, context, cmd, token: DatabaseShutdown));
}

await Task.WhenAll(tasks);
}
}

public static ShardedDocumentDatabase CastToShardedDocumentDatabase(DocumentDatabase database) => database as ShardedDocumentDatabase ?? throw new ArgumentException($"Database {database.Name} must be sharded!");

public class DeleteBucketCommand : DocumentMergedTransactionCommand
Expand Down

0 comments on commit 5265cb8

Please sign in to comment.