Skip to content

Commit

Permalink
RavenDB-21159 : flaky test adjustments :
Browse files Browse the repository at this point in the history
- use a singe store with orchestrator on all 3 nodes
- wait for etl on all shard nodes (instead of a single node) and use the same MRE instance for all shard replicas (for each shard, only one of its nodes will be responsible for the etl)
- dispose a node instead of removing from cluster
- choose an actual responsible node to remove instead of a random node
- after disposing node, wait for it to be removed from orchestrators members in db record
  • Loading branch information
aviv86 committed Aug 24, 2023
1 parent 9a635e8 commit c2a1f68
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 51 deletions.
119 changes: 69 additions & 50 deletions test/SlowTests/Sharding/ETL/ShardedEtlFailoverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Raven.Client.Exceptions.Cluster;
using Raven.Client.ServerWide;
using Raven.Client.ServerWide.Operations;
using Raven.Client.ServerWide.Sharding;
using Raven.Server;
using Raven.Server.Documents.ETL;
using Raven.Server.Documents.Sharding;
Expand Down Expand Up @@ -658,29 +659,42 @@ public async Task ShouldSendCounterChangeMadeInCluster()
[RavenFact(RavenTestCategory.Etl | RavenTestCategory.Cluster | RavenTestCategory.Sharding)]
public async Task OlapTaskShouldBeHighlyAvailable()
{
var nodes = await CreateRaftCluster(3, watcherCluster: true);
var leader = nodes.Leader;
var dbName = GetDatabaseName();
var srcNodes = await ShardingCluster.CreateShardedDatabaseInCluster(dbName, replicationFactor: 2, nodes, shards: 3);
var cluster = await CreateRaftCluster(3, watcherCluster: true);
var leader = cluster.Leader;
var followers = cluster.Nodes.Where(n => n != leader).Select(n => n.ServerStore.NodeTag).ToList();

var stores = srcNodes.Servers.Select(s => new DocumentStore
var options = new Options
{
Database = dbName,
Urls = new[] { s.WebUrl },
Conventions = new DocumentConventions
DatabaseMode = RavenDatabaseMode.Sharded,
ModifyDatabaseRecord = r =>
{
DisableTopologyUpdates = true
}
}.Initialize())
.ToList();
r.Sharding = new ShardingConfiguration
{
Shards = new Dictionary<int, DatabaseTopology>(3),
Orchestrator = new OrchestratorConfiguration
{
Topology = new OrchestratorTopology
{
ReplicationFactor = 3
}
}
};
var server = srcNodes.Servers.First(s => s != leader);
var store = stores.Single(s => s.Urls[0] == server.WebUrl);
for (int shardNumber = 0; shardNumber < 3; shardNumber++)
{
r.Sharding.Shards[shardNumber] = new DatabaseTopology
{
ReplicationFactor = 2,
Members = followers
};
}
},
Server = leader
};

Assert.Equal(store.Database, dbName);
using var store = GetDocumentStore(options);

var baseline = new DateTime(2020, 1, 1);

using (var session = store.OpenAsyncSession())
{
for (int i = 0; i < 31; i++)
Expand All @@ -697,6 +711,8 @@ await session.StoreAsync(new Query.Order
await session.SaveChangesAsync();
}

var etlsDone = Sharding.Etl.WaitForEtlOnAllShardsInCluster(store.Database, (n, s) => s.LoadSuccesses > 0);

var script = @"
var orderDate = new Date(this.OrderedAt);
var year = orderDate.getFullYear();
Expand Down Expand Up @@ -732,32 +748,56 @@ await session.StoreAsync(new Query.Order
}
};

var result = store.Maintenance.Send(new PutConnectionStringOperation<OlapConnectionString>(new OlapConnectionString

Etl.AddEtl(store, configuration, new OlapConnectionString
{
Name = connectionStringName,
LocalSettings = new LocalSettings
{
FolderPath = path
}
}));
Assert.NotNull(result.RaftCommandIndex);

store.Maintenance.Send(new AddEtlOperation<OlapConnectionString>(configuration));
var databases = srcNodes.Servers.Single(s => s.ServerStore.NodeTag == server.ServerStore.NodeTag)
.ServerStore.DatabasesLandlord.TryGetOrCreateShardedResourcesStore(dbName);
});

var etlsDone = WaitForEtlOnAllShards(server, dbName, (n, s) => s.LoadSuccesses > 0);
var timeout = TimeSpan.FromMinutes(2);
var waitHandles = etlsDone.Select(mre => mre.WaitHandle).ToArray();
WaitHandle.WaitAll(waitHandles, TimeSpan.FromMinutes(2));
Assert.True(WaitHandle.WaitAll(waitHandles, timeout), await Etl.GetEtlDebugInfo(store.Database, timeout, RavenDatabaseMode.Sharded));

var files = Directory.GetFiles(path, "*.*", SearchOption.AllDirectories);
var count = files.Length;
Assert.True(count > 0);

await ActionWithLeader(l => l.ServerStore.RemoveFromClusterAsync(server.ServerStore.NodeTag), nodes.Nodes);
string serverToRemove = null;
foreach (var node in cluster.Nodes)
{
// select one of the responsible nodes as the node to remove
if (serverToRemove != null)
break;

var store2 = stores.First(s => s != store);
using (var session = store2.OpenAsyncSession())
foreach (var task in node.ServerStore.DatabasesLandlord.TryGetOrCreateShardedResourcesStore(store.Database))
{
var databaseInstance = await task;
if (databaseInstance.EtlLoader.Processes.Length > 0)
{
serverToRemove = node.ServerStore.NodeTag;
break;
}
}
}

Assert.NotNull(serverToRemove);
Assert.Contains(serverToRemove, followers);
await DisposeAndRemoveServer(cluster.Nodes.Single(n => n.ServerStore.NodeTag == serverToRemove));

await WaitForValueAsync(async () =>
{
var shardingConfig = await Sharding.GetShardingConfigurationAsync(store);
return shardingConfig.Orchestrator.Topology.Members.Contains(serverToRemove);
}, expectedVal: false);


etlsDone = Sharding.Etl.WaitForEtlOnAllShardsInCluster(store.Database, (n, s) => s.LoadSuccesses > 0);

using (var session = store.OpenAsyncSession())
{
for (int i = 0; i < 28; i++)
{
Expand All @@ -773,32 +813,11 @@ await session.StoreAsync(new Query.Order
await session.SaveChangesAsync();
}

etlsDone = WaitForEtlOnAllShards(leader, dbName, (n, s) => s.LoadSuccesses > 0);
waitHandles = etlsDone.Select(mre => mre.WaitHandle).ToArray();
WaitHandle.WaitAll(waitHandles, TimeSpan.FromMinutes(1));
Assert.True(WaitHandle.WaitAll(waitHandles, timeout), await Etl.GetEtlDebugInfo(store.Database, timeout, RavenDatabaseMode.Sharded));

files = Directory.GetFiles(path, "*.*", SearchOption.AllDirectories);
Assert.True(files.Length > count);
}

private static IEnumerable<ManualResetEventSlim> WaitForEtlOnAllShards(RavenServer server, string database, Func<string, EtlProcessStatistics, bool> predicate)
{
var dbs = server.ServerStore.DatabasesLandlord.TryGetOrCreateShardedResourcesStore(database).ToList();
var list = new List<ManualResetEventSlim>(dbs.Count);
foreach (var task in dbs)
{
var mre = new ManualResetEventSlim();
list.Add(mre);

var db = task.Result;
db.EtlLoader.BatchCompleted += x =>
{
if (predicate($"{x.ConfigurationName}/{x.TransformationName}", x.Statistics))
mre.Set();
};
}

return list;
}
}
}
6 changes: 5 additions & 1 deletion test/Tests.Infrastructure/RavenTestBase.Etl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ public async Task<string> GetEtlDebugInfo(string database, TimeSpan timeout, Rav
foreach (var documentDatabase in databases)
{
var performanceStats = GetEtlPerformanceStatsForDatabase(documentDatabase);
if (performanceStats == null)
continue;
sb.AppendLine($"database '{documentDatabase.Name}' stats : {performanceStats}");
}

Expand All @@ -327,7 +329,9 @@ public async Task<string> GetEtlDebugInfo(string database, TimeSpan timeout, Rav

public string GetEtlPerformanceStatsForDatabase(DocumentDatabase database)
{
var process = database.EtlLoader.Processes.First();
var process = database.EtlLoader.Processes.FirstOrDefault();
if (process == null)
return null;
var stats = process.GetPerformanceStats();
return string.Join(Environment.NewLine, stats.Select(JsonConvert.SerializeObject));
}
Expand Down
26 changes: 26 additions & 0 deletions test/Tests.Infrastructure/RavenTestBase.ShardedEtlTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,32 @@ public IEnumerable<ManualResetEventSlim> WaitForEtlOnAllShards(IDocumentStore st
return list;
}

public IEnumerable<ManualResetEventSlim> WaitForEtlOnAllShardsInCluster(string database, Func<string, EtlProcessStatistics, bool> predicate)
{
var mresPerShard = new Dictionary<string, ManualResetEventSlim>();
foreach (var server in _parent.Servers)
{
var dbs = server.ServerStore.DatabasesLandlord.TryGetOrCreateShardedResourcesStore(database);
foreach (var task in dbs)
{
var db = task.Result;

if (mresPerShard.TryGetValue(db.Name, out var mre) == false)
{
mresPerShard[db.Name] = mre = new ManualResetEventSlim();
}

db.EtlLoader.BatchCompleted += x =>
{
if (predicate($"{x.ConfigurationName}/{x.TransformationName}", x.Statistics))
mre.Set();
};
}
}

return mresPerShard.Values;
}

public ManualResetEventSlim WaitForEtl(IDocumentStore store, Func<string, EtlProcessStatistics, bool> predicate)
{
return AsyncHelpers.RunSync(() => WaitForEtlAsync(store, predicate, count: 1));
Expand Down

0 comments on commit c2a1f68

Please sign in to comment.