diff --git a/src/Raven.Server/Documents/ETL/EtlProcess.cs b/src/Raven.Server/Documents/ETL/EtlProcess.cs index 739fd276eb35..c34853331162 100644 --- a/src/Raven.Server/Documents/ETL/EtlProcess.cs +++ b/src/Raven.Server/Documents/ETL/EtlProcess.cs @@ -488,7 +488,7 @@ public bool Load(IEnumerable items, DocumentsOperationContext cont stats.RecordLoadFailure(); stats.RecordBatchStopReason($"{msg} : {e}"); - EnterFallbackMode(); + EnterFallbackMode(Statistics.LastLoadErrorTime); Statistics.RecordLoadError(e.ToString(), documentId: null, count: stats.NumberOfExtractedItems.Sum(x => x.Value)); } @@ -498,14 +498,14 @@ public bool Load(IEnumerable items, DocumentsOperationContext cont } } - private void EnterFallbackMode() + private void EnterFallbackMode(DateTime? lastErrorTime) { - if (Statistics.LastLoadErrorTime == null) + if (lastErrorTime == null) FallbackTime = TimeSpan.FromSeconds(5); else { // double the fallback time (but don't cross Etl.MaxFallbackTime) - var secondsSinceLastError = (Database.Time.GetUtcNow() - Statistics.LastLoadErrorTime.Value).TotalSeconds; + var secondsSinceLastError = (Database.Time.GetUtcNow() - lastErrorTime.Value).TotalSeconds; FallbackTime = TimeSpan.FromSeconds(Math.Min(Database.Configuration.Etl.MaxFallbackTime.AsTimeSpan.TotalSeconds, Math.Max(5, secondsSinceLastError * 2))); } @@ -837,20 +837,33 @@ public void Run() if (didWork) { - try - { - UpdateEtlProcessState(state); - Database.EtlLoader.OnBatchCompleted(ConfigurationName, TransformationName, Statistics); - } - catch (Exception e) + DateTime? lastUpdateStateErrorTime = null; + while (true) { - if (CancellationToken.IsCancellationRequested == false) + try + { + UpdateEtlProcessState(state); + break; + } + catch (Exception e) { - if (Logger.IsOperationsEnabled) + if (CancellationToken.IsCancellationRequested) + return; + + if (Logger.IsOperationsEnabled) Logger.Operations($"{Tag} Failed to update state of ETL process '{Name}'", e); + + EnterFallbackMode(lastUpdateStateErrorTime); + lastUpdateStateErrorTime = SystemTime.UtcNow; + + if (CancellationToken.WaitHandle.WaitOne(FallbackTime.Value)) + return; + + FallbackTime = null; } } + Database.EtlLoader.OnBatchCompleted(ConfigurationName, TransformationName, Statistics); continue; } try diff --git a/test/SlowTests/Server/Documents/ETL/RavenDB_20757.cs b/test/SlowTests/Server/Documents/ETL/RavenDB_20757.cs new file mode 100644 index 000000000000..f2a3b2796893 --- /dev/null +++ b/test/SlowTests/Server/Documents/ETL/RavenDB_20757.cs @@ -0,0 +1,205 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using FastTests.Server.Replication; +using Raven.Client.Documents; +using Raven.Client.Documents.Operations.ConnectionStrings; +using Raven.Client.Documents.Operations.ETL; +using Raven.Client.ServerWide; +using Raven.Server; +using Raven.Server.Config; +using Raven.Server.Config.Settings; +using Raven.Server.Utils; +using Raven.Tests.Core.Utils.Entities; +using Xunit; +using Xunit.Abstractions; + +namespace SlowTests.Server.Documents.ETL; + +public class RavenDB_20757 : ReplicationTestBase +{ + public RavenDB_20757(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task OnFailureToUpdateProcessStateEtlShouldEnterFallback() + { + var (nodes, leader) = await CreateRaftCluster(3, watcherCluster: true, leaderIndex: 0, + customSettings: new Dictionary() + { + [RavenConfiguration.GetKey(x => x.Cluster.OperationTimeout)] = "5" + }, + shouldRunInMemory: false); + + var mentor = nodes[1]; + var destNode = nodes[2]; + + using (var src = GetDocumentStore(new Options + { + Server = leader, + ReplicationFactor = 3, + RunInMemory = false + })) + using (var dest = GetDocumentStore(new Options + { + Server = destNode, + ReplicationFactor = 1, + ModifyDatabaseRecord = r => r.Topology = new DatabaseTopology + { + Members = new List{ destNode.ServerStore.NodeTag } + }, + RunInMemory = false + })) + { + AddEtl(src, dest, mentor.ServerStore.NodeTag); + + var etlDone = await WaitForEtl(mentor, src.Database); + + using (var session = src.OpenSession()) + { + session.Store(new User(), "users/1"); + session.SaveChanges(); + } + + Assert.True(etlDone.Wait(TimeSpan.FromSeconds(10))); + + var destDb = await GetDatabase(destNode, dest.Database); + + long etag; + using (var session = dest.OpenAsyncSession()) + { + var user = await session.LoadAsync("users/1"); + var cvString = session.Advanced.GetChangeVectorFor(user); + etag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id); + } + + // dispose leader + var disposeResult = await DisposeServerAndWaitForFinishOfDisposalAsync(leader); + + etlDone.Reset(); + using (var session = src.OpenSession()) + { + var user = session.Load("users/1"); + user.Name = "Jerry"; + session.SaveChanges(); + } + + Assert.True(WaitForDocument(dest, "users/1", u => u.Name == "Jerry"), "no jerry?"); + + // etl reached destination but fails to complete batch (can't update process state) + var timeout = (int)mentor.ServerStore.Configuration.Cluster.OperationTimeout.AsTimeSpan.TotalMilliseconds * 3; + Assert.False(etlDone.Wait(timeout)); + + // assert that the same modification wasn't sent more than once + using (var session = dest.OpenAsyncSession()) + { + var user = await session.LoadAsync("users/1"); + var cvString = session.Advanced.GetChangeVectorFor(user); + var newEtag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id); + + Assert.Equal(etag + 1, newEtag); + + etag = newEtag; + } + + var srcDb = await GetDatabase(mentor, src.Database); + var procState = srcDb.EtlLoader.Processes.FirstOrDefault(); + Assert.NotNull(procState); + + var stats = procState.GetPerformanceStats() + .Where(s => s.LastLoadedEtag > 0) + .OrderBy(s => s.Id) + .ToList(); + + Assert.NotEmpty(stats); + + var current = stats[0]; + for (int i = 1; i < stats.Count; i++) + { + var next = stats[i]; + Assert.True(current.LastLoadedEtag < next.LastLoadedEtag); + current = next; + } + + // revive node + GetNewServer(new ServerCreationOptions + { + CustomSettings = new Dictionary { { RavenConfiguration.GetKey(x => x.Core.ServerUrls), leader.WebUrl } }, + RunInMemory = false, + DeletePrevious = false, + DataDirectory = disposeResult.DataDirectory + }); + + // now etl should complete batch successfully + Assert.True(etlDone.Wait(TimeSpan.FromSeconds(60))); + etlDone.Reset(); + + using (var session = src.OpenSession()) + { + var user = session.Load("users/1"); + user.LastName = "Garcia"; + session.SaveChanges(); + } + + Assert.True(etlDone.Wait(TimeSpan.FromSeconds(10))); + + using (var session = dest.OpenAsyncSession()) + { + var user = await session.LoadAsync("users/1"); + Assert.Equal("Garcia", user.LastName); + + var cvString = session.Advanced.GetChangeVectorFor(user); + var newEtag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id); + + Assert.Equal(etag + 1, newEtag); + } + } + } + + private static void AddEtl(IDocumentStore src, IDocumentStore destination, string mentor) + { + const string connectionStringName = "cs"; + var connectionString = new RavenConnectionString + { + Name = connectionStringName, + Database = destination.Database, + TopologyDiscoveryUrls = destination.Urls + }; + + var result = src.Maintenance.Send(new PutConnectionStringOperation(connectionString)); + Assert.NotNull(result.RaftCommandIndex); + + src.Maintenance.Send(new AddEtlOperation(new RavenEtlConfiguration + { + Name = connectionStringName, + ConnectionStringName = connectionStringName, + Transforms = + { + new Transformation + { + Name = $"ETL : {connectionStringName}", + Collections = new List(new[] { "Users" }), + Script = null, + ApplyToAllDocuments = false, + Disabled = false + } + }, + MentorNode = mentor + })); + } + + private static async Task WaitForEtl(RavenServer server, string database) + { + var documentDatabase = await GetDatabase(server, database); + var mre = new ManualResetEventSlim(); + documentDatabase.EtlLoader.BatchCompleted += x => + { + if (x.Statistics.LoadSuccesses > 0) + mre.Set(); + }; + return mre; + } +}