Skip to content

Commit

Permalink
RavenDB-20757 : on failure to update process state ETL should enter f…
Browse files Browse the repository at this point in the history
…allback mode, instead of keep sending the same batch
  • Loading branch information
aviv86 committed Oct 16, 2023
1 parent a18ff79 commit bc0c454
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 12 deletions.
37 changes: 25 additions & 12 deletions src/Raven.Server/Documents/ETL/EtlProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public bool Load(IEnumerable<TTransformed> items, DocumentsOperationContext cont
stats.RecordLoadFailure();
stats.RecordBatchStopReason($"{msg} : {e}");

EnterFallbackMode();
EnterFallbackMode(Statistics.LastLoadErrorTime);

Statistics.RecordLoadError(e.ToString(), documentId: null, count: stats.NumberOfExtractedItems.Sum(x => x.Value));
}
Expand All @@ -498,14 +498,14 @@ public bool Load(IEnumerable<TTransformed> items, DocumentsOperationContext cont
}
}

private void EnterFallbackMode()
private void EnterFallbackMode(DateTime? lastErrorTime)
{
if (Statistics.LastLoadErrorTime == null)
if (lastErrorTime == null)
FallbackTime = TimeSpan.FromSeconds(5);
else
{
// double the fallback time (but don't cross Etl.MaxFallbackTime)
var secondsSinceLastError = (Database.Time.GetUtcNow() - Statistics.LastLoadErrorTime.Value).TotalSeconds;
var secondsSinceLastError = (Database.Time.GetUtcNow() - lastErrorTime.Value).TotalSeconds;

FallbackTime = TimeSpan.FromSeconds(Math.Min(Database.Configuration.Etl.MaxFallbackTime.AsTimeSpan.TotalSeconds, Math.Max(5, secondsSinceLastError * 2)));
}
Expand Down Expand Up @@ -837,20 +837,33 @@ public void Run()

if (didWork)
{
try
{
UpdateEtlProcessState(state);
Database.EtlLoader.OnBatchCompleted(ConfigurationName, TransformationName, Statistics);
}
catch (Exception e)
DateTime? lastUpdateStateErrorTime = null;
while (true)
{
if (CancellationToken.IsCancellationRequested == false)
try
{
UpdateEtlProcessState(state);
break;
}
catch (Exception e)
{
if (Logger.IsOperationsEnabled)
if (CancellationToken.IsCancellationRequested)
return;

if (Logger.IsOperationsEnabled)
Logger.Operations($"{Tag} Failed to update state of ETL process '{Name}'", e);

EnterFallbackMode(lastUpdateStateErrorTime);
lastUpdateStateErrorTime = SystemTime.UtcNow;

if (CancellationToken.WaitHandle.WaitOne(FallbackTime.Value))
return;

FallbackTime = null;
}
}

Database.EtlLoader.OnBatchCompleted(ConfigurationName, TransformationName, Statistics);
continue;
}
try
Expand Down
205 changes: 205 additions & 0 deletions test/SlowTests/Server/Documents/ETL/RavenDB_20757.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FastTests.Server.Replication;
using Raven.Client.Documents;
using Raven.Client.Documents.Operations.ConnectionStrings;
using Raven.Client.Documents.Operations.ETL;
using Raven.Client.ServerWide;
using Raven.Server;
using Raven.Server.Config;
using Raven.Server.Config.Settings;
using Raven.Server.Utils;
using Raven.Tests.Core.Utils.Entities;
using Xunit;
using Xunit.Abstractions;

namespace SlowTests.Server.Documents.ETL;

public class RavenDB_20757 : ReplicationTestBase
{
public RavenDB_20757(ITestOutputHelper output) : base(output)
{
}

[Fact]
public async Task OnFailureToUpdateProcessStateEtlShouldEnterFallback()
{
var (nodes, leader) = await CreateRaftCluster(3, watcherCluster: true, leaderIndex: 0,
customSettings: new Dictionary<string, string>()
{
[RavenConfiguration.GetKey(x => x.Cluster.OperationTimeout)] = "5"
},
shouldRunInMemory: false);

var mentor = nodes[1];
var destNode = nodes[2];

using (var src = GetDocumentStore(new Options
{
Server = leader,
ReplicationFactor = 3,
RunInMemory = false
}))
using (var dest = GetDocumentStore(new Options
{
Server = destNode,
ReplicationFactor = 1,
ModifyDatabaseRecord = r => r.Topology = new DatabaseTopology
{
Members = new List<string>{ destNode.ServerStore.NodeTag }
},
RunInMemory = false
}))
{
AddEtl(src, dest, mentor.ServerStore.NodeTag);

var etlDone = await WaitForEtl(mentor, src.Database);

using (var session = src.OpenSession())
{
session.Store(new User(), "users/1");
session.SaveChanges();
}

Assert.True(etlDone.Wait(TimeSpan.FromSeconds(10)));

var destDb = await GetDatabase(destNode, dest.Database);

long etag;
using (var session = dest.OpenAsyncSession())
{
var user = await session.LoadAsync<User>("users/1");
var cvString = session.Advanced.GetChangeVectorFor(user);
etag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id);
}

// dispose leader
var disposeResult = await DisposeServerAndWaitForFinishOfDisposalAsync(leader);

etlDone.Reset();
using (var session = src.OpenSession())
{
var user = session.Load<User>("users/1");
user.Name = "Jerry";
session.SaveChanges();
}

Assert.True(WaitForDocument<User>(dest, "users/1", u => u.Name == "Jerry"), "no jerry?");

// etl reached destination but fails to complete batch (can't update process state)
var timeout = (int)mentor.ServerStore.Configuration.Cluster.OperationTimeout.AsTimeSpan.TotalMilliseconds * 3;
Assert.False(etlDone.Wait(timeout));

// assert that the same modification wasn't sent more than once
using (var session = dest.OpenAsyncSession())
{
var user = await session.LoadAsync<User>("users/1");
var cvString = session.Advanced.GetChangeVectorFor(user);
var newEtag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id);

Assert.Equal(etag + 1, newEtag);

etag = newEtag;
}

var srcDb = await GetDatabase(mentor, src.Database);
var procState = srcDb.EtlLoader.Processes.FirstOrDefault();
Assert.NotNull(procState);

var stats = procState.GetPerformanceStats()
.Where(s => s.LastLoadedEtag > 0)
.OrderBy(s => s.Id)
.ToList();

Assert.NotEmpty(stats);

var current = stats[0];
for (int i = 1; i < stats.Count; i++)
{
var next = stats[i];
Assert.True(current.LastLoadedEtag < next.LastLoadedEtag);
current = next;
}

// revive node
GetNewServer(new ServerCreationOptions
{
CustomSettings = new Dictionary<string, string> { { RavenConfiguration.GetKey(x => x.Core.ServerUrls), leader.WebUrl } },
RunInMemory = false,
DeletePrevious = false,
DataDirectory = disposeResult.DataDirectory
});

// now etl should complete batch successfully
Assert.True(etlDone.Wait(TimeSpan.FromSeconds(60)));
etlDone.Reset();

using (var session = src.OpenSession())
{
var user = session.Load<User>("users/1");
user.LastName = "Garcia";
session.SaveChanges();
}

Assert.True(etlDone.Wait(TimeSpan.FromSeconds(10)));

using (var session = dest.OpenAsyncSession())
{
var user = await session.LoadAsync<User>("users/1");
Assert.Equal("Garcia", user.LastName);

var cvString = session.Advanced.GetChangeVectorFor(user);
var newEtag = ChangeVectorUtils.GetEtagById(cvString, destDb.DbBase64Id);

Assert.Equal(etag + 1, newEtag);
}
}
}

private static void AddEtl(IDocumentStore src, IDocumentStore destination, string mentor)
{
const string connectionStringName = "cs";
var connectionString = new RavenConnectionString
{
Name = connectionStringName,
Database = destination.Database,
TopologyDiscoveryUrls = destination.Urls
};

var result = src.Maintenance.Send(new PutConnectionStringOperation<RavenConnectionString>(connectionString));
Assert.NotNull(result.RaftCommandIndex);

src.Maintenance.Send(new AddEtlOperation<RavenConnectionString>(new RavenEtlConfiguration
{
Name = connectionStringName,
ConnectionStringName = connectionStringName,
Transforms =
{
new Transformation
{
Name = $"ETL : {connectionStringName}",
Collections = new List<string>(new[] { "Users" }),
Script = null,
ApplyToAllDocuments = false,
Disabled = false
}
},
MentorNode = mentor
}));
}

private static async Task<ManualResetEventSlim> WaitForEtl(RavenServer server, string database)
{
var documentDatabase = await GetDatabase(server, database);
var mre = new ManualResetEventSlim();
documentDatabase.EtlLoader.BatchCompleted += x =>
{
if (x.Statistics.LoadSuccesses > 0)
mre.Set();
};
return mre;
}
}

0 comments on commit bc0c454

Please sign in to comment.