Skip to content

Commit

Permalink
Bugfix/race condition in memory transactional event router (#146)
Browse files Browse the repository at this point in the history
* Hardened Event Routing using concurrent event collections. 

* Fixed bug with includable queries.

* Dependency bump/sync.
  • Loading branch information
jasonmwebb-lv authored Jan 7, 2025
1 parent c5ee920 commit 8be9e3a
Show file tree
Hide file tree
Showing 26 changed files with 150 additions and 93 deletions.
2 changes: 1 addition & 1 deletion Build/Build.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected override void OnBuildInitialized()
{
Log.Information("Generating NuGet packages for projects in solution");
int commitNum = 0;
string NuGetVersionCustom = "2.1.1.3";
string NuGetVersionCustom = "2.1.1.4";


//if it's not a tagged release - append the commit number to the package version
Expand Down
2 changes: 1 addition & 1 deletion Build/Build.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.1" />
<PackageReference Include="Nuke.Common" Version="8.1.4" />
<PackageReference Include="Nuke.GitHub" Version="6.0.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<ItemGroup>
<PackageReference Include="AutoMapper" Version="13.0.1" />
<PackageReference Include="Bogus" Version="35.6.1" />
<PackageReference Include="FluentAssertions" Version="6.12.2" />
<PackageReference Include="FluentAssertions" Version="7.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.1" />
Expand All @@ -19,10 +19,10 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="Shouldly" Version="4.2.1" />
<PackageReference Include="nunit" Version="4.2.2" />
<PackageReference Include="nunit" Version="4.3.2" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="coverlet.collector" Version="6.0.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageReference Include="Microsoft.AspNetCore.Identity.EntityFrameworkCore" Version="8.0.10" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="8.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.10" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.11" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="8.0.10">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PackageReference Include="LocalStorage" Version="2.1.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.NewtonsoftJson" Version="8.0.10" />
<PackageReference Include="AutoMapper" Version="13.0.1" />
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.2.1" />
<PackageReference Include="Microsoft.IdentityModel.JsonWebTokens" Version="8.3.0" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="9.0.0" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.10" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.11" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="8.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.0.0" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerGen" Version="7.2.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand All @@ -17,13 +18,13 @@ public class InMemoryTransactionalEventRouter : IEventRouter
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<InMemoryTransactionalEventRouter> _logger;
private List<ISerializableEvent> _storedTransactionalEvents;
private ConcurrentQueue<ISerializableEvent> _storedTransactionalEvents;

public InMemoryTransactionalEventRouter(IServiceProvider serviceProvider, ILogger<InMemoryTransactionalEventRouter> logger)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_storedTransactionalEvents = new List<ISerializableEvent>();
_storedTransactionalEvents = new ConcurrentQueue<ISerializableEvent>();
}

public async Task RouteEventsAsync(IEnumerable<ISerializableEvent> transactionalEvents)
Expand All @@ -39,23 +40,28 @@ public async Task RouteEventsAsync(IEnumerable<ISerializableEvent> transactional
// Seperate Async events from Sync Events
var syncEvents = transactionalEvents.Where(x => x is ISyncEvent);
var asyncEvents = transactionalEvents.Where(x => x is IAsyncEvent);
var remainingEvents = transactionalEvents.Where(x => x is not IAsyncEvent && x is not ISyncEvent);
var eventProducers = _serviceProvider.GetServices<IEventProducer>();

if (syncEvents.Any() && asyncEvents.Any())
if (syncEvents.Any())
{
// Produce the Synchronized Events first
_logger.LogInformation($"{this.GetGenericTypeName()} is routing {syncEvents.Count().ToString()} synchronized transactional events.");
await this.ProduceSyncEvents(syncEvents, eventProducers).ConfigureAwait(false);
}

if (asyncEvents.Any())
{
// Produce the Async Events
_logger.LogInformation($"{this.GetGenericTypeName()} is routing {asyncEvents.Count().ToString()} asynchronous transactional events.");
await this.ProduceAsyncEvents(asyncEvents, eventProducers).ConfigureAwait(false);
}
else

if (remainingEvents.Any()) // Could be ISerializable events left over that are not marked as ISyncEvent or IAsyncEvent
{
// Send as synchronized by default
_logger.LogInformation($"No sync/async events found. {this.GetGenericTypeName()} is routing {syncEvents.Count().ToString()} as synchronized transactional events by default.");
await this.ProduceSyncEvents(transactionalEvents, eventProducers).ConfigureAwait(false);
_logger.LogInformation($"No sync/async events found. {this.GetGenericTypeName()} is routing {remainingEvents.Count().ToString()} serializable events as synchronized transactional events by default.");
await this.ProduceSyncEvents(remainingEvents, eventProducers).ConfigureAwait(false);
}

}
Expand Down Expand Up @@ -100,16 +106,59 @@ private async Task ProduceSyncEvents(IEnumerable<ISerializableEvent> syncEvents,
}
}

/// <summary>
/// Routes all transactional events. This will loop until we have removed all the events from the concurrent queue.
/// </summary>
/// <returns>Completed Task</returns>
/// <remarks>This should help us avoid race conditions e.g. a subscriber/event handler adds new events while we are processing the current list</remarks>
public async Task RouteEventsAsync()
{
await this.RouteEventsAsync(this._storedTransactionalEvents).ConfigureAwait(false);
this._storedTransactionalEvents.Clear();

while (_storedTransactionalEvents.Any())
{
var currentEvents = new List<ISerializableEvent>();
_storedTransactionalEvents.ForEach(x => currentEvents.Add(x));
await this.RouteEventsAsync(currentEvents).ConfigureAwait(false);
RemoveEvents(currentEvents);
}
}

private void RemoveEvents(IEnumerable<ISerializableEvent> events)
{
foreach (var @event in events)
{
var item = @event;

for (int i = 1; i <= 4; i++) // Try 4 times
{
if (!RemoveEvent(item))
{
i++;
}
else
{
break;
}

if (i == 4)
{
throw new EventProductionException($"Could not Dequeue event {item}");
}
}

}
}

private bool RemoveEvent(ISerializableEvent @event)
{
bool success = _storedTransactionalEvents.TryDequeue(out @event);
return success;
}

public void AddTransactionalEvent(ISerializableEvent serializableEvent)
{
Guard.IsNotNull(serializableEvent, nameof(serializableEvent));
_storedTransactionalEvents.Add(serializableEvent);
_storedTransactionalEvents.Enqueue(serializableEvent);
}

public void AddTransactionalEvents(IEnumerable<ISerializableEvent> serializableEvents)
Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.Core/RCommon.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Nito.AsyncEx.Context" Version="5.1.2" />
</ItemGroup>
Expand Down
12 changes: 10 additions & 2 deletions Src/RCommon.EfCore/Crud/EFCoreRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,21 @@ public override bool Tracking

public override IEagerLoadableQueryable<TEntity> Include(Expression<Func<TEntity, object>> path)
{
_includableQueryable = ObjectContext.Set<TEntity>().Include(path);
if (_includableQueryable == null)
{
_includableQueryable = ObjectContext.Set<TEntity>().Include(path);
}
else
{
_includableQueryable = _includableQueryable.Include(path);
}

return this;
}

public override IEagerLoadableQueryable<TEntity> ThenInclude<TPreviousProperty, TProperty>(Expression<Func<object, TProperty>> path)
{
// TODO: This is likely a bug. The received is incorrect.
// TODO: This is likely a bug. The receiver is incorrect.
_repositoryQuery = _includableQueryable.ThenInclude(path);
return this;
}
Expand Down
4 changes: 2 additions & 2 deletions Src/RCommon.EfCore/RCommon.EFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
<TargetFrameworks>net8.0;</TargetFrameworks>
</PropertyGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.11" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.11" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.MassTransit/RCommon.MassTransit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MassTransit" Version="8.3.2" />
<PackageReference Include="MassTransit" Version="8.3.4" />
</ItemGroup>

<ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Src/RCommon.Wolverine/RCommon.Wolverine.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">
<PackageReference Include="WolverineFx" Version="3.3.0" />
<PackageReference Include="WolverineFx" Version="3.5.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
<PackageReference Include="Bogus" Version="35.6.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="Moq" Version="4.20.72" />
<PackageReference Include="MSTest.TestAdapter" Version="3.6.3" />
<PackageReference Include="MSTest.TestFramework" Version="3.6.3" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="MSTest.TestAdapter" Version="3.7.0" />
<PackageReference Include="MSTest.TestFramework" Version="3.7.0" />
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit" Version="4.3.2" />
</ItemGroup>

<ItemGroup>
Expand Down
8 changes: 4 additions & 4 deletions Tests/RCommon.Emailing.Tests/RCommon.Emailing.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
<ItemGroup>
<PackageReference Include="Bogus" Version="35.6.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="MSTest.TestAdapter" Version="3.6.3" />
<PackageReference Include="MSTest.TestFramework" Version="3.6.3" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="MSTest.TestAdapter" Version="3.7.0" />
<PackageReference Include="MSTest.TestFramework" Version="3.7.0" />
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit" Version="4.3.2" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<PackageReference Include="Bogus" Version="35.6.1" />
<PackageReference Include="Microsoft.AspNetCore.TestHost" Version="8.0.10" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="MSTest.TestAdapter" Version="3.6.3" />
<PackageReference Include="MSTest.TestFramework" Version="3.6.3" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="MSTest.TestAdapter" Version="3.7.0" />
<PackageReference Include="MSTest.TestFramework" Version="3.7.0" />
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit" Version="4.3.2" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit" Version="4.3.2" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="NUnit.Analyzers" Version="4.4.0">
<PackageReference Include="NUnit.Analyzers" Version="4.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="NUnit" Version="4.3.2" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="NUnit.Analyzers" Version="4.4.0">
<PackageReference Include="NUnit.Analyzers" Version="4.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@

<ItemGroup>
<PackageReference Include="Bogus" Version="35.6.1" />
<PackageReference Include="coverlet.msbuild" Version="6.0.2">
<PackageReference Include="coverlet.msbuild" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Dapper.FluentMap.Dommel" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="8.0.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
<PackageReference Include="MSTest.TestAdapter" Version="3.6.3" />
<PackageReference Include="MSTest.TestFramework" Version="3.6.3" />
<PackageReference Include="coverlet.collector" Version="6.0.2">
<PackageReference Include="MSTest.TestAdapter" Version="3.7.0" />
<PackageReference Include="MSTest.TestFramework" Version="3.7.0" />
<PackageReference Include="coverlet.collector" Version="6.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="NUnit" Version="4.2.2" />
<PackageReference Include="ReportGenerator" Version="5.4.1" />
<PackageReference Include="NUnit" Version="4.3.2" />
<PackageReference Include="ReportGenerator" Version="5.4.3" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 8be9e3a

Please sign in to comment.