From 12ecc3c1b746cf7b9be6fac783982f5a83aa0c6d Mon Sep 17 00:00:00 2001 From: aviv Date: Sun, 14 Jan 2024 18:13:35 +0200 Subject: [PATCH] RavenDB-17793 : throw on attempt to move bucket from prefixed range if destination shard is not a part of shards list in the prefix setting --- .../Sharding/StartBucketMigrationCommand.cs | 24 + test/SlowTests/Sharding/PrefixedSharding.cs | 745 +++++++++++++----- .../RavenTestBase.ReshardingTestBase.cs | 16 +- 3 files changed, 594 insertions(+), 191 deletions(-) diff --git a/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs b/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs index cb482928e560..08922b1a2392 100644 --- a/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs +++ b/src/Raven.Server/ServerWide/Commands/Sharding/StartBucketMigrationCommand.cs @@ -55,6 +55,30 @@ public override void UpdateDatabaseRecord(DatabaseRecord record, long etag) } } + + if (Bucket >= ShardHelper.NumberOfBuckets) + { + // prefixed bucket range + var prefixed = record.Sharding.Prefixed; + List shards = null; + for (int i = 0; i < prefixed.Count; i++) + { + var bucketRangeStart = prefixed[i].BucketRangeStart; + int nextBucketRangeStart = i == prefixed.Count - 1 + ? int.MaxValue + : prefixed[i + 1].BucketRangeStart; + + if (Bucket < bucketRangeStart || Bucket >= nextBucketRangeStart) + continue; + + shards = prefixed[i].Shards; + break; + } + + if (shards == null || shards.Contains(DestinationShard) == false) + throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists"); + } + if (record.Sharding.Shards.ContainsKey(DestinationShard) == false) throw new RachisApplyException($"Destination shard {DestinationShard} doesn't exists"); diff --git a/test/SlowTests/Sharding/PrefixedSharding.cs b/test/SlowTests/Sharding/PrefixedSharding.cs index 7cdd5775fd64..6d572b9fde05 100644 --- a/test/SlowTests/Sharding/PrefixedSharding.cs +++ b/test/SlowTests/Sharding/PrefixedSharding.cs @@ -4,7 +4,6 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; -using FastTests; using Raven.Client.Documents; using Raven.Client.Documents.Operations.Backups; using Raven.Client.Exceptions; @@ -12,6 +11,7 @@ using Raven.Client.ServerWide.Sharding; using Raven.Server.Documents; using Raven.Server.Documents.Sharding; +using Raven.Server.Rachis; using Raven.Server.ServerWide.Context; using Raven.Server.Utils; using Raven.Tests.Core.Utils.Entities; @@ -26,7 +26,7 @@ namespace SlowTests.Sharding; -public class PrefixedSharding : RavenTestBase +public class PrefixedSharding : ClusterTestBase { public PrefixedSharding(ITestOutputHelper output) : base(output) { @@ -43,24 +43,24 @@ public async Task CanShardByDocumentsPrefix() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { // range for 'eu/' is : // shard 0 : [1M, 2M] - Prefix = "eu/", - Shards = new List { 0 } + Prefix = "eu/", + Shards = [0] }, new PrefixedShardingSetting { // range for 'asia/' is : // shard 1 : [2M, 2.5M] // shard 2 : [2.5M, 3M] - Prefix = "asia/", - Shards = new List { 1, 2 } + Prefix = "asia/", + Shards = [1, 2] } - }; + ]; } }); @@ -103,7 +103,7 @@ public async Task CanShardByDocumentsPrefix() } } - var rand = new System.Random(2022_04_19); + var rand = new Random(2022_04_19); var prefixes = new[] { "us/", "eu/", "asia/", null }; int d = 0; @@ -157,7 +157,7 @@ public async Task ShouldThrowOnAttemptToAddPrefixThatDoesntEndWithSlashOrComma() var task = store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "asia", - Shards = new List { 1, 2 } + Shards = [1, 2] })); var e = await Assert.ThrowsAsync(async () => await task); @@ -174,14 +174,14 @@ public async Task ShouldNotAllowToAddPrefixIfWeHaveDocsStartingWith() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 0 } + Prefix = "eu/", + Shards = [0] } - }; + ]; } }); @@ -199,7 +199,7 @@ public async Task ShouldNotAllowToAddPrefixIfWeHaveDocsStartingWith() var task = store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "asia/", - Shards = new List { 1, 2 } + Shards = [1, 2] })); var e = await Assert.ThrowsAsync(async () => await task); @@ -216,19 +216,19 @@ public async Task ShouldNotAllowToDeletePrefixIfWeHaveDocsStartingWith() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "asia/", - Shards = new List { 0 } + Prefix = "asia/", + Shards = [0] }, new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 1, 2 } + Prefix = "eu/", + Shards = [1, 2] } - }; + ]; } }); @@ -258,14 +258,14 @@ public async Task CanAddPrefixIfNoDocsStartingWith() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 0 } + Prefix = "eu/", + Shards = [0] } - }; + ]; } }); @@ -287,7 +287,7 @@ public async Task CanAddPrefixIfNoDocsStartingWith() await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "asia/", - Shards = new List { 1, 2 } + Shards = [1, 2] })); shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); @@ -305,14 +305,14 @@ public async Task CanDeletePrefixIfNoDocsStartingWith() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 0, 1 } + Prefix = "eu/", + Shards = [0, 1] } - }; + ]; } }); @@ -347,19 +347,19 @@ public async Task CanDeleteOnePrefixThenAddAnotherIfNoDocsStartingWith() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 0, 1 } + Prefix = "eu/", + Shards = [0, 1] }, new PrefixedShardingSetting { - Prefix = "us/", - Shards = new List { 1, 2 } + Prefix = "us/", + Shards = [1, 2] } - }; + ]; } }); @@ -389,7 +389,7 @@ public async Task CanDeleteOnePrefixThenAddAnotherIfNoDocsStartingWith() await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "africa/", - Shards = new List { 0, 2 } + Shards = [0, 2] })); shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); @@ -410,24 +410,24 @@ public async Task BackupAndRestoreShardedDatabase_ShouldPreservePrefixedSettings ModifyDatabaseRecord = databaseRecord => { databaseRecord.Sharding ??= new ShardingConfiguration(); - databaseRecord.Sharding.Prefixed = new List - { + databaseRecord.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "users/", - Shards = new List { 0 } + Prefix = "users/", + Shards = [0] }, new PrefixedShardingSetting { - Prefix = "orders/", - Shards = new List { 1 } + Prefix = "orders/", + Shards = [1] }, new PrefixedShardingSetting { - Prefix = "employees/", - Shards = new List { 2 } + Prefix = "employees/", + Shards = [2] } - }; + ]; } }); @@ -600,93 +600,6 @@ public async Task BackupAndRestoreShardedDatabase_ShouldPreservePrefixedSettings } } - [RavenFact(RavenTestCategory.Sharding)] - public async Task CanMoveOneBucketFromPrefixedRange() - { - using var store = Sharding.GetDocumentStore(new Options - { - ModifyDatabaseRecord = record => - { - record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { - new PrefixedShardingSetting - { - // bucket range for 'users/' is : - // shard 0 : [1M, 1.5M] - // shard 1 : [1.5M, 2M] - Prefix = "users/", - Shards = new List { 0, 1 } - } - }; - } - }); - - const string id = "users/1"; - using (var session = store.OpenAsyncSession()) - { - var user = new User - { - Name = "Original shard" - }; - await session.StoreAsync(user, id); - await session.SaveChangesAsync(); - } - - var shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); - - Assert.Equal(5, shardingConfiguration.BucketRanges.Count); - Assert.Equal(ShardHelper.NumberOfBuckets, shardingConfiguration.Prefixed[0].BucketRangeStart); - - var bucket = await Sharding.GetBucketAsync(store, id); - - var originalLocation = ShardHelper.GetShardNumberFor(shardingConfiguration, bucket); - Assert.Contains(originalLocation, shardingConfiguration.Prefixed[0].Shards); - var newLocation = shardingConfiguration.Prefixed[0].Shards.Single(s => s != originalLocation); - - await Sharding.Resharding.MoveShardForId(store, id); - - shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); - Assert.Equal(7, shardingConfiguration.BucketRanges.Count); - - Assert.Equal(bucket, shardingConfiguration.BucketRanges[^2].BucketRangeStart); - Assert.Equal(newLocation, shardingConfiguration.BucketRanges[^2].ShardNumber); - - Assert.Equal(bucket + 1, shardingConfiguration.BucketRanges[^1].BucketRangeStart); - Assert.Equal(originalLocation, shardingConfiguration.BucketRanges[^1].ShardNumber); - - using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalLocation))) - { - var user = await session.LoadAsync(id); - Assert.Null(user); - } - using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newLocation))) - { - var user = await session.LoadAsync(id); - Assert.Equal("Original shard", user.Name); - } - - // the document will be written to the new location - using (var session = store.OpenAsyncSession()) - { - var user = await session.LoadAsync(id); - user.Name = "New shard"; - await session.SaveChangesAsync(); - } - - using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalLocation))) - { - var user = await session.LoadAsync(id); - Assert.Null(user); - } - - using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newLocation))) - { - var user = await session.LoadAsync(id); - Assert.Equal("New shard", user.Name); - } - } - [RavenFact(RavenTestCategory.Sharding)] public async Task CanGetBucketStats_Prefixed() { @@ -695,19 +608,19 @@ public async Task CanGetBucketStats_Prefixed() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { - new PrefixedShardingSetting() - { - Prefix = "Users/", - Shards = new List { 0 } - }, - new PrefixedShardingSetting() - { - Prefix = "Orders/", - Shards = new List { 1 } - } - }; + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + Prefix = "Users/", + Shards = [0] + }, + new PrefixedShardingSetting + { + Prefix = "Orders/", + Shards = [1] + } + ]; } })) { @@ -781,24 +694,25 @@ public void RavenDb_19737() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { // range for 'eu/' is : // shard 0 : [1M, 2M] - Prefix = "eu/", - Shards = new List { 0 } + Prefix = "eu/", + Shards = [0] }, + new PrefixedShardingSetting { // range for 'asia/' is : // shard 1 : [2M, 2.5M] // shard 2 : [2.5M, 3M] - Prefix = "asia/", - Shards = new List { 1, 2 } + Prefix = "asia/", + Shards = [1, 2] } - }; + ]; } }); @@ -832,19 +746,19 @@ public async Task CanUpdatePrefixSetting() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "eu/", - Shards = new List { 0, 1 } + Prefix = "eu/", + Shards = [0, 1] }, new PrefixedShardingSetting { - Prefix = "us/", - Shards = new List { 1, 2 } + Prefix = "us/", + Shards = [1, 2] } - }; + ]; } }); @@ -901,7 +815,7 @@ await store.Maintenance.SendAsync( new UpdatePrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "eu/", - Shards = new List { 0, 2 } + Shards = [0, 2] }))); // can delete shard #1 from 'us/' prefix setting (no docs starting with) @@ -909,7 +823,7 @@ await store.Maintenance.SendAsync( new UpdatePrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "us/", - Shards = new List { 2 } + Shards = [2] })); shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); @@ -938,24 +852,24 @@ public async Task CanHandlePrefixOfPrefix() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "users/", - Shards = new List { 0 } + Prefix = "users/", + Shards = [0] }, new PrefixedShardingSetting { - Prefix = "users/us/utah/", - Shards = new List { 1 } + Prefix = "users/us/utah/", + Shards = [1] }, new PrefixedShardingSetting { - Prefix = "users/us/", - Shards = new List { 2 } + Prefix = "users/us/", + Shards = [2] } - }; + ]; } }); @@ -1010,7 +924,7 @@ public async Task CanHandlePrefixOfPrefix() await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "users/us/arizona/", - Shards = new List { 1 } + Shards = [1] })); shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); @@ -1089,29 +1003,32 @@ public async Task AfterAddingNewPrefixMatchingDocsShouldNotGoToWrongShard() ModifyDatabaseRecord = record => { record.Sharding ??= new ShardingConfiguration(); - record.Sharding.Prefixed = new List - { + record.Sharding.Prefixed = + [ new PrefixedShardingSetting { - Prefix = "users/", - Shards = new List { 0 } + Prefix = "users/", + Shards = [0] }, + new PrefixedShardingSetting { - Prefix = "users/eu/", - Shards = new List { 0 } + Prefix = "users/eu/", + Shards = [0] }, + new PrefixedShardingSetting { - Prefix = "users/asia/", - Shards = new List { 2 } + Prefix = "users/asia/", + Shards = [2] }, + new PrefixedShardingSetting { - Prefix = "users/africa/", - Shards = new List { 2 } + Prefix = "users/africa/", + Shards = [2] } - }; + ]; } }); @@ -1138,7 +1055,7 @@ public async Task AfterAddingNewPrefixMatchingDocsShouldNotGoToWrongShard() await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting { Prefix = "users/us/", - Shards = new List { 1 } + Shards = [1] })); // should all go to shard #1 @@ -1173,6 +1090,454 @@ await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new Pr } + [RavenFact(RavenTestCategory.Sharding)] + public async Task PrefixesOperationsShouldBeCaseInsensitive() + { + using var store = Sharding.GetDocumentStore(new Options + { + ModifyDatabaseRecord = record => + { + record.Sharding ??= new ShardingConfiguration(); + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + Prefix = "Users/", + Shards = [0, 1] + }, + new PrefixedShardingSetting + { + Prefix = "Companies/", + Shards = [0, 1, 2] + } + ]; + } + }); + + + await store.Maintenance.SendAsync(new UpdatePrefixedShardingSettingOperation(new PrefixedShardingSetting + { + Prefix = "users/", + Shards = [1, 2] + })); + + var shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); + + Assert.Equal(2, shardingConfiguration.Prefixed.Count); + Assert.Equal("Users/", shardingConfiguration.Prefixed[0].Prefix); + Assert.Equal(new[] { 1, 2 }, shardingConfiguration.Prefixed[0].Shards); + + await store.Maintenance.SendAsync(new DeletePrefixedShardingSettingOperation("COMPANIES/")); + + shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); + + Assert.Equal(1, shardingConfiguration.Prefixed.Count); + + await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting + { + Prefix = "products/", + Shards = new List { 2 } + })); + + using (var session = store.OpenAsyncSession()) + { + for (int i = 0; i < 10; i++) + { + var id = $"Products/{i}"; + await session.StoreAsync(new Item(), id); + } + + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 0))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("PRODUCTS/")).ToList(); + Assert.Equal(0, docs.Count); + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 1))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("PRODUCTS/")).ToList(); + Assert.Equal(0, docs.Count); + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 2))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("PRODUCTS/")).ToList(); + Assert.Equal(10, docs.Count); + } + + var task = store.Maintenance.SendAsync(new UpdatePrefixedShardingSettingOperation(new PrefixedShardingSetting() + { + Prefix = "Products/", + Shards = new List() { 1 } + })); + await Assert.ThrowsAsync(async () => await task); + } + + [RavenFact(RavenTestCategory.Sharding)] + public async Task UpdatePrefixesInCluster() + { + var cluster = await CreateRaftCluster(3, watcherCluster: true); + var options = Sharding.GetOptionsForCluster(cluster.Leader, shards: 3, shardReplicationFactor: 1, orchestratorReplicationFactor: 3); + options.ModifyDatabaseRecord += record => + { + record.Sharding ??= new ShardingConfiguration(); + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + Prefix = "users/", + Shards = [0] + }, + new PrefixedShardingSetting + { + Prefix = "users/us/utah/", + Shards = [1] + }, + new PrefixedShardingSetting + { + Prefix = "users/us/", + Shards = [2] + } + ]; + }; + using var store = GetDocumentStore(options); + + using (var session = store.OpenAsyncSession()) + { + for (int i = 0; i < 10; i++) + { + await session.StoreAsync(new Item(), "users/eu/sweden/" + i); + await session.StoreAsync(new Item(), "users/us/utah/" + i); + await session.StoreAsync(new Item(), "users/us/california/" + i); + } + + await session.SaveChangesAsync(); + } + + //var stores = Cluster.GetDocumentStores() + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 0))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/")).ToList(); + Assert.Equal(10, docs.Count); + + foreach (var doc in docs) + { + Assert.StartsWith("users/eu/sweden/", doc.Id); + } + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 1))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/")).ToList(); + Assert.Equal(10, docs.Count); + + foreach (var doc in docs) + { + Assert.StartsWith("users/us/utah/", doc.Id); + } + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 2))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/")).ToList(); + Assert.Equal(10, docs.Count); + + foreach (var doc in docs) + { + Assert.StartsWith("users/us/california/", doc.Id); + } + } + + // add 'users/us/arizona/' prefix setting + await store.Maintenance.SendAsync(new AddPrefixedShardingSettingOperation(new PrefixedShardingSetting + { + Prefix = "users/us/arizona/", + Shards = new List { 1 } + })); + + using (var session = store.OpenAsyncSession()) + { + for (int i = 0; i < 10; i++) + { + string id = "users/us/arizona/" + i; + await session.StoreAsync(new Item(), id); + } + + await session.SaveChangesAsync(); + } + + var stores = GetDocumentStores(cluster.Nodes, store.Database, disableTopologyUpdates: true); + foreach (var s in stores) + { + using (var session = s.OpenAsyncSession()) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/us/arizona/")).ToList(); + Assert.Equal(10, docs.Count); + } + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 0))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/us/arizona/")).ToList(); + Assert.Equal(0, docs.Count); + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 1))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/us/arizona/")).ToList(); + Assert.Equal(10, docs.Count); + } + + using (var session = store.OpenAsyncSession(database: ShardHelper.ToShardName(store.Database, 2))) + { + var docs = (await session.Advanced.LoadStartingWithAsync("users/us/arizona/")).ToList(); + Assert.Equal(0, docs.Count); + } + } + + [RavenFact(RavenTestCategory.Sharding)] + public async Task CanMoveOneBucketFromPrefixedRange() + { + using var store = Sharding.GetDocumentStore(new Options + { + ModifyDatabaseRecord = record => + { + record.Sharding ??= new ShardingConfiguration(); + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + // bucket range for 'users/' is : + // shard 0 : [1M, 1.5M] + // shard 1 : [1.5M, 2M] + Prefix = "users/", + Shards = [0, 1] + } + ]; + } + }); + + const string id = "users/1"; + using (var session = store.OpenAsyncSession()) + { + var user = new User + { + Name = "Original shard" + }; + await session.StoreAsync(user, id); + await session.SaveChangesAsync(); + } + + var shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); + + Assert.Equal(5, shardingConfiguration.BucketRanges.Count); + Assert.Equal(ShardHelper.NumberOfBuckets, shardingConfiguration.Prefixed[0].BucketRangeStart); + + var bucket = await Sharding.GetBucketAsync(store, id); + + var originalLocation = ShardHelper.GetShardNumberFor(shardingConfiguration, bucket); + Assert.Contains(originalLocation, shardingConfiguration.Prefixed[0].Shards); + var newLocation = shardingConfiguration.Prefixed[0].Shards.Single(s => s != originalLocation); + + await Sharding.Resharding.MoveShardForId(store, id); + + shardingConfiguration = await Sharding.GetShardingConfigurationAsync(store); + Assert.Equal(7, shardingConfiguration.BucketRanges.Count); + + Assert.Equal(bucket, shardingConfiguration.BucketRanges[^2].BucketRangeStart); + Assert.Equal(newLocation, shardingConfiguration.BucketRanges[^2].ShardNumber); + + Assert.Equal(bucket + 1, shardingConfiguration.BucketRanges[^1].BucketRangeStart); + Assert.Equal(originalLocation, shardingConfiguration.BucketRanges[^1].ShardNumber); + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalLocation))) + { + var user = await session.LoadAsync(id); + Assert.Null(user); + } + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newLocation))) + { + var user = await session.LoadAsync(id); + Assert.Equal("Original shard", user.Name); + } + + // the document will be written to the new location + using (var session = store.OpenAsyncSession()) + { + var user = await session.LoadAsync(id); + user.Name = "New shard"; + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalLocation))) + { + var user = await session.LoadAsync(id); + Assert.Null(user); + } + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newLocation))) + { + var user = await session.LoadAsync(id); + Assert.Equal("New shard", user.Name); + } + } + + [RavenFact(RavenTestCategory.Sharding)] + public async Task CanMoveOneBucketFromPrefixedRangeToNewShard() + { + var (_, leader) = await CreateRaftCluster(3, watcherCluster: true); + var options = Sharding.GetOptionsForCluster(leader, shards: 2, shardReplicationFactor: 2, orchestratorReplicationFactor: 2); + options.ModifyDatabaseRecord += record => + { + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + Prefix = "foo/", + Shards = [0] + } + ]; + + }; + + using (var store = GetDocumentStore(options)) + { + var record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database)); + var shardTopology = record.Sharding.Shards[0]; + Assert.Equal(2, shardTopology.Members.Count); + Assert.Equal(0, shardTopology.Promotables.Count); + Assert.Equal(2, shardTopology.ReplicationFactor); + + //create new shard + var res = store.Maintenance.Server.Send(new AddDatabaseShardOperation(store.Database)); + var newShardNumber = res.ShardNumber; + Assert.Equal(2, newShardNumber); + Assert.Equal(2, res.ShardTopology.ReplicationFactor); + Assert.Equal(2, res.ShardTopology.AllNodes.Count()); + await Cluster.WaitForRaftIndexToBeAppliedInClusterAsync(res.RaftCommandIndex); + + await AssertWaitForValueAsync(async () => + { + record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database)); + return record.Sharding.Shards.Count; + }, 3); + + await AssertWaitForValueAsync(async () => + { + record = await store.Maintenance.Server.SendAsync(new GetDatabaseRecordOperation(store.Database)); + record.Sharding.Shards.TryGetValue(newShardNumber, out shardTopology); + return shardTopology?.Members?.Count; + }, 2); + + var nodesContainingNewShard = shardTopology.Members; + + foreach (var node in nodesContainingNewShard) + { + var serverWithNewShard = Servers.Single(x => x.ServerStore.NodeTag == node); + Assert.True(serverWithNewShard.ServerStore.DatabasesLandlord.DatabasesCache.TryGetValue(ShardHelper.ToShardName(store.Database, newShardNumber), out _)); + } + + var id = "foo/bar"; + var bucket = await Sharding.GetBucketAsync(store, id); + var originalDocShard = await Sharding.GetShardNumberForAsync(store, id); + Assert.Equal(0, originalDocShard); + + using (var session = store.OpenAsyncSession()) + { + session.Advanced.WaitForReplicationAfterSaveChanges(replicas: 1); + + var user = new User + { + Name = "Original shard" + }; + await session.StoreAsync(user, id); + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalDocShard))) + { + var user = await session.LoadAsync(id); + Assert.NotNull(user); + } + + // first we need to add the new shard to the prefix setting + await store.Maintenance.SendAsync(new UpdatePrefixedShardingSettingOperation(new PrefixedShardingSetting + { + Prefix = "foo/", + Shards = [0, newShardNumber] + })); + + // move bucket + await Sharding.Resharding.MoveShardForId(store, id, newShardNumber); + + var exists = WaitForDocument(store, id, predicate: null, database: ShardHelper.ToShardName(store.Database, newShardNumber)); + Assert.True(exists); + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newShardNumber))) + { + var user = await session.LoadAsync(id); + Assert.NotNull(user); + } + + // check bucket ranges + var sharding = await Sharding.GetShardingConfigurationAsync(store); + Assert.Equal(5, sharding.BucketRanges.Count); + Assert.Equal(ShardHelper.NumberOfBuckets ,sharding.BucketRanges[2].BucketRangeStart); + Assert.Equal(bucket, sharding.BucketRanges[3].BucketRangeStart); + Assert.Equal(bucket + 1, sharding.BucketRanges[4].BucketRangeStart); + + // the document will be written to the new location + using (var session = store.OpenAsyncSession()) + { + var user = await session.LoadAsync(id); + user.Name = "New shard"; + await session.SaveChangesAsync(); + } + + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, originalDocShard))) + { + var user = await session.LoadAsync(id); + Assert.Null(user); + } + using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, newShardNumber))) + { + var user = await session.LoadAsync(id); + Assert.Equal("New shard", user.Name); + } + } + } + + [RavenFact(RavenTestCategory.Sharding)] + public async Task ShouldThrowOnAttemptToMovePrefixedBucketToShardNotInPrefixSetting() + { + using var store = Sharding.GetDocumentStore(new Options + { + ModifyDatabaseRecord = record => + { + record.Sharding ??= new ShardingConfiguration(); + record.Sharding.Prefixed = + [ + new PrefixedShardingSetting + { + Prefix = "users/", + Shards = [0, 1] + } + ]; + } + }); + + var shardingConfig = await Sharding.GetShardingConfigurationAsync(store); + var bucket = Sharding.GetBucket(shardingConfig, "users/1"); + + // shard #2 is not a part of Prefixed['users/'].Shards + await Assert.ThrowsAsync(async ()=> + await Server.ServerStore.Sharding.StartBucketMigration(store.Database, bucket, toShard : 2)); + } + private class Item { #pragma warning disable CS0649 diff --git a/test/Tests.Infrastructure/RavenTestBase.ReshardingTestBase.cs b/test/Tests.Infrastructure/RavenTestBase.ReshardingTestBase.cs index dd4f4aff0730..90d839e1a17b 100644 --- a/test/Tests.Infrastructure/RavenTestBase.ReshardingTestBase.cs +++ b/test/Tests.Infrastructure/RavenTestBase.ReshardingTestBase.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Raven.Client.Documents; @@ -40,7 +41,20 @@ public async Task StartMovingShardForId(IDocumentStore store, string id, in } var shardNumber = ShardHelper.GetShardNumberFor(record.Sharding, bucket); - var moveToShard = ShardingTestBase.GetNextSortedShardNumber(shards: prefixed != null ? prefixed.Shards : record.Sharding.Shards.Keys, shardNumber); + var shards = prefixed != null ? prefixed.Shards : record.Sharding.Shards.Keys.ToList(); + + int moveToShard; + if (toShard.HasValue) + { + if (shards.Contains(toShard.Value) == false) + throw new InvalidOperationException($"Cannot move bucket '{bucket}' from shard {shardNumber} to shard {toShard}. " + + $"Sharding topology does not contain shard {toShard}"); + moveToShard = toShard.Value; + } + else + { + moveToShard = ShardingTestBase.GetNextSortedShardNumber(shards, shardNumber); + } using (var session = store.OpenAsyncSession(ShardHelper.ToShardName(store.Database, shardNumber))) {