diff --git a/Build/Build.cs b/Build/Build.cs index fb471be2..46113d39 100644 --- a/Build/Build.cs +++ b/Build/Build.cs @@ -135,7 +135,7 @@ protected override void OnBuildInitialized() { Log.Information("Generating NuGet packages for projects in solution"); int commitNum = 0; - string NuGetVersionCustom = "2.1.1.3"; + string NuGetVersionCustom = "2.1.1.4"; //if it's not a tagged release - append the commit number to the package version diff --git a/Build/Build.csproj b/Build/Build.csproj index b868fac1..d1f60e44 100644 --- a/Build/Build.csproj +++ b/Build/Build.csproj @@ -13,7 +13,7 @@ - + diff --git a/Examples/CleanWithCQRS/HR.LeaveManagement.API/HR.LeaveManagement.API.csproj b/Examples/CleanWithCQRS/HR.LeaveManagement.API/HR.LeaveManagement.API.csproj index f14ba05c..576de5d3 100644 --- a/Examples/CleanWithCQRS/HR.LeaveManagement.API/HR.LeaveManagement.API.csproj +++ b/Examples/CleanWithCQRS/HR.LeaveManagement.API/HR.LeaveManagement.API.csproj @@ -8,7 +8,7 @@ - + diff --git a/Examples/CleanWithCQRS/HR.LeaveManagement.Application.UnitTests/HR.LeaveManagement.Application.UnitTests.csproj b/Examples/CleanWithCQRS/HR.LeaveManagement.Application.UnitTests/HR.LeaveManagement.Application.UnitTests.csproj index af6cb8b1..c107f631 100644 --- a/Examples/CleanWithCQRS/HR.LeaveManagement.Application.UnitTests/HR.LeaveManagement.Application.UnitTests.csproj +++ b/Examples/CleanWithCQRS/HR.LeaveManagement.Application.UnitTests/HR.LeaveManagement.Application.UnitTests.csproj @@ -10,7 +10,7 @@ - + @@ -19,10 +19,10 @@ - + - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/Examples/CleanWithCQRS/HR.LeaveManagement.Identity/HR.LeaveManagement.Identity.csproj b/Examples/CleanWithCQRS/HR.LeaveManagement.Identity/HR.LeaveManagement.Identity.csproj index 37b5c5ea..58df5c19 100644 --- a/Examples/CleanWithCQRS/HR.LeaveManagement.Identity/HR.LeaveManagement.Identity.csproj +++ b/Examples/CleanWithCQRS/HR.LeaveManagement.Identity/HR.LeaveManagement.Identity.csproj @@ -11,7 +11,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Examples/CleanWithCQRS/HR.LeaveManagement.MVC/HR.LeaveManagement.MVC.csproj b/Examples/CleanWithCQRS/HR.LeaveManagement.MVC/HR.LeaveManagement.MVC.csproj index 857d4b54..68aee8f0 100644 --- a/Examples/CleanWithCQRS/HR.LeaveManagement.MVC/HR.LeaveManagement.MVC.csproj +++ b/Examples/CleanWithCQRS/HR.LeaveManagement.MVC/HR.LeaveManagement.MVC.csproj @@ -10,7 +10,7 @@ - + diff --git a/Examples/CleanWithCQRS/HR.LeaveManagement.Persistence/HR.LeaveManagement.Persistence.csproj b/Examples/CleanWithCQRS/HR.LeaveManagement.Persistence/HR.LeaveManagement.Persistence.csproj index de60c5bd..3bda07f5 100644 --- a/Examples/CleanWithCQRS/HR.LeaveManagement.Persistence/HR.LeaveManagement.Persistence.csproj +++ b/Examples/CleanWithCQRS/HR.LeaveManagement.Persistence/HR.LeaveManagement.Persistence.csproj @@ -7,7 +7,7 @@ - + diff --git a/Src/RCommon.Authorization.Web/RCommon.Authorization.Web.csproj b/Src/RCommon.Authorization.Web/RCommon.Authorization.Web.csproj index e90366cd..ad21c286 100644 --- a/Src/RCommon.Authorization.Web/RCommon.Authorization.Web.csproj +++ b/Src/RCommon.Authorization.Web/RCommon.Authorization.Web.csproj @@ -5,7 +5,7 @@ - + diff --git a/Src/RCommon.Core/EventHandling/Producers/InMemoryTransactionalEventRouter.cs b/Src/RCommon.Core/EventHandling/Producers/InMemoryTransactionalEventRouter.cs index 388c9270..ae662262 100644 --- a/Src/RCommon.Core/EventHandling/Producers/InMemoryTransactionalEventRouter.cs +++ b/Src/RCommon.Core/EventHandling/Producers/InMemoryTransactionalEventRouter.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; @@ -17,13 +18,13 @@ public class InMemoryTransactionalEventRouter : IEventRouter { private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; - private List _storedTransactionalEvents; + private ConcurrentQueue _storedTransactionalEvents; public InMemoryTransactionalEventRouter(IServiceProvider serviceProvider, ILogger logger) { _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _storedTransactionalEvents = new List(); + _storedTransactionalEvents = new ConcurrentQueue(); } public async Task RouteEventsAsync(IEnumerable transactionalEvents) @@ -39,23 +40,28 @@ public async Task RouteEventsAsync(IEnumerable transactional // Seperate Async events from Sync Events var syncEvents = transactionalEvents.Where(x => x is ISyncEvent); var asyncEvents = transactionalEvents.Where(x => x is IAsyncEvent); + var remainingEvents = transactionalEvents.Where(x => x is not IAsyncEvent && x is not ISyncEvent); var eventProducers = _serviceProvider.GetServices(); - if (syncEvents.Any() && asyncEvents.Any()) + if (syncEvents.Any()) { // Produce the Synchronized Events first _logger.LogInformation($"{this.GetGenericTypeName()} is routing {syncEvents.Count().ToString()} synchronized transactional events."); await this.ProduceSyncEvents(syncEvents, eventProducers).ConfigureAwait(false); + } + if (asyncEvents.Any()) + { // Produce the Async Events _logger.LogInformation($"{this.GetGenericTypeName()} is routing {asyncEvents.Count().ToString()} asynchronous transactional events."); await this.ProduceAsyncEvents(asyncEvents, eventProducers).ConfigureAwait(false); } - else + + if (remainingEvents.Any()) // Could be ISerializable events left over that are not marked as ISyncEvent or IAsyncEvent { // Send as synchronized by default - _logger.LogInformation($"No sync/async events found. {this.GetGenericTypeName()} is routing {syncEvents.Count().ToString()} as synchronized transactional events by default."); - await this.ProduceSyncEvents(transactionalEvents, eventProducers).ConfigureAwait(false); + _logger.LogInformation($"No sync/async events found. {this.GetGenericTypeName()} is routing {remainingEvents.Count().ToString()} serializable events as synchronized transactional events by default."); + await this.ProduceSyncEvents(remainingEvents, eventProducers).ConfigureAwait(false); } } @@ -100,16 +106,59 @@ private async Task ProduceSyncEvents(IEnumerable syncEvents, } } + /// + /// Routes all transactional events. This will loop until we have removed all the events from the concurrent queue. + /// + /// Completed Task + /// This should help us avoid race conditions e.g. a subscriber/event handler adds new events while we are processing the current list public async Task RouteEventsAsync() { - await this.RouteEventsAsync(this._storedTransactionalEvents).ConfigureAwait(false); - this._storedTransactionalEvents.Clear(); + + while (_storedTransactionalEvents.Any()) + { + var currentEvents = new List(); + _storedTransactionalEvents.ForEach(x => currentEvents.Add(x)); + await this.RouteEventsAsync(currentEvents).ConfigureAwait(false); + RemoveEvents(currentEvents); + } + } + + private void RemoveEvents(IEnumerable events) + { + foreach (var @event in events) + { + var item = @event; + + for (int i = 1; i <= 4; i++) // Try 4 times + { + if (!RemoveEvent(item)) + { + i++; + } + else + { + break; + } + + if (i == 4) + { + throw new EventProductionException($"Could not Dequeue event {item}"); + } + } + + } + } + + private bool RemoveEvent(ISerializableEvent @event) + { + bool success = _storedTransactionalEvents.TryDequeue(out @event); + return success; } public void AddTransactionalEvent(ISerializableEvent serializableEvent) { Guard.IsNotNull(serializableEvent, nameof(serializableEvent)); - _storedTransactionalEvents.Add(serializableEvent); + _storedTransactionalEvents.Enqueue(serializableEvent); } public void AddTransactionalEvents(IEnumerable serializableEvents) diff --git a/Src/RCommon.Core/RCommon.Core.csproj b/Src/RCommon.Core/RCommon.Core.csproj index be94f991..589df676 100644 --- a/Src/RCommon.Core/RCommon.Core.csproj +++ b/Src/RCommon.Core/RCommon.Core.csproj @@ -7,7 +7,7 @@ - + diff --git a/Src/RCommon.EfCore/Crud/EFCoreRepository.cs b/Src/RCommon.EfCore/Crud/EFCoreRepository.cs index d037feaf..bf610bb2 100644 --- a/Src/RCommon.EfCore/Crud/EFCoreRepository.cs +++ b/Src/RCommon.EfCore/Crud/EFCoreRepository.cs @@ -89,13 +89,21 @@ public override bool Tracking public override IEagerLoadableQueryable Include(Expression> path) { - _includableQueryable = ObjectContext.Set().Include(path); + if (_includableQueryable == null) + { + _includableQueryable = ObjectContext.Set().Include(path); + } + else + { + _includableQueryable = _includableQueryable.Include(path); + } + return this; } public override IEagerLoadableQueryable ThenInclude(Expression> path) { - // TODO: This is likely a bug. The received is incorrect. + // TODO: This is likely a bug. The receiver is incorrect. _repositoryQuery = _includableQueryable.ThenInclude(path); return this; } diff --git a/Src/RCommon.EfCore/RCommon.EFCore.csproj b/Src/RCommon.EfCore/RCommon.EFCore.csproj index 03d91367..7bc18c42 100644 --- a/Src/RCommon.EfCore/RCommon.EFCore.csproj +++ b/Src/RCommon.EfCore/RCommon.EFCore.csproj @@ -5,8 +5,8 @@ net8.0; - - + + diff --git a/Src/RCommon.MassTransit/RCommon.MassTransit.csproj b/Src/RCommon.MassTransit/RCommon.MassTransit.csproj index ad175237..8254c373 100644 --- a/Src/RCommon.MassTransit/RCommon.MassTransit.csproj +++ b/Src/RCommon.MassTransit/RCommon.MassTransit.csproj @@ -7,7 +7,7 @@ - + diff --git a/Src/RCommon.Wolverine/RCommon.Wolverine.csproj b/Src/RCommon.Wolverine/RCommon.Wolverine.csproj index e563bf68..02ddbb62 100644 --- a/Src/RCommon.Wolverine/RCommon.Wolverine.csproj +++ b/Src/RCommon.Wolverine/RCommon.Wolverine.csproj @@ -7,7 +7,7 @@ - + diff --git a/Tests/RCommon.Emailing.SendGrid.Tests/RCommon.Emailing.SendGrid.Tests.csproj b/Tests/RCommon.Emailing.SendGrid.Tests/RCommon.Emailing.SendGrid.Tests.csproj index 7210ff64..1789f21d 100644 --- a/Tests/RCommon.Emailing.SendGrid.Tests/RCommon.Emailing.SendGrid.Tests.csproj +++ b/Tests/RCommon.Emailing.SendGrid.Tests/RCommon.Emailing.SendGrid.Tests.csproj @@ -11,13 +11,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Tests/RCommon.Emailing.Tests/RCommon.Emailing.Tests.csproj b/Tests/RCommon.Emailing.Tests/RCommon.Emailing.Tests.csproj index 471f7f4f..02ab71ec 100644 --- a/Tests/RCommon.Emailing.Tests/RCommon.Emailing.Tests.csproj +++ b/Tests/RCommon.Emailing.Tests/RCommon.Emailing.Tests.csproj @@ -10,13 +10,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Tests/RCommon.Mediator.MediatR.Tests/RCommon.Mediator.MediatR.Tests.csproj b/Tests/RCommon.Mediator.MediatR.Tests/RCommon.Mediator.MediatR.Tests.csproj index 61aa919a..954546a7 100644 --- a/Tests/RCommon.Mediator.MediatR.Tests/RCommon.Mediator.MediatR.Tests.csproj +++ b/Tests/RCommon.Mediator.MediatR.Tests/RCommon.Mediator.MediatR.Tests.csproj @@ -12,13 +12,13 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/Tests/RCommon.Messaging.MassTransit.Tests/RCommon.Messaging.MassTransit.Tests.csproj b/Tests/RCommon.Messaging.MassTransit.Tests/RCommon.Messaging.MassTransit.Tests.csproj index 0636cfab..2681b25b 100644 --- a/Tests/RCommon.Messaging.MassTransit.Tests/RCommon.Messaging.MassTransit.Tests.csproj +++ b/Tests/RCommon.Messaging.MassTransit.Tests/RCommon.Messaging.MassTransit.Tests.csproj @@ -10,13 +10,13 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Tests/RCommon.Messaging.Wolverine.Tests/RCommon.Messaging.Wolverine.Tests.csproj b/Tests/RCommon.Messaging.Wolverine.Tests/RCommon.Messaging.Wolverine.Tests.csproj index c27d4070..98b933c5 100644 --- a/Tests/RCommon.Messaging.Wolverine.Tests/RCommon.Messaging.Wolverine.Tests.csproj +++ b/Tests/RCommon.Messaging.Wolverine.Tests/RCommon.Messaging.Wolverine.Tests.csproj @@ -10,13 +10,13 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/Tests/RCommon.Persistence.Dapper.Tests/RCommon.Persistence.Dapper.Tests.csproj b/Tests/RCommon.Persistence.Dapper.Tests/RCommon.Persistence.Dapper.Tests.csproj index 890c5a4d..13deda10 100644 --- a/Tests/RCommon.Persistence.Dapper.Tests/RCommon.Persistence.Dapper.Tests.csproj +++ b/Tests/RCommon.Persistence.Dapper.Tests/RCommon.Persistence.Dapper.Tests.csproj @@ -8,21 +8,21 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + diff --git a/Tests/RCommon.Persistence.EFCore.Tests/RCommon.Persistence.EFCore.Tests.csproj b/Tests/RCommon.Persistence.EFCore.Tests/RCommon.Persistence.EFCore.Tests.csproj index 6ceacd59..aceb173b 100644 --- a/Tests/RCommon.Persistence.EFCore.Tests/RCommon.Persistence.EFCore.Tests.csproj +++ b/Tests/RCommon.Persistence.EFCore.Tests/RCommon.Persistence.EFCore.Tests.csproj @@ -14,16 +14,16 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - + + + + + + + + - + diff --git a/Tests/RCommon.Persistence.Linq2Db.Tests/RCommon.Persistence.Linq2Db.Tests.csproj b/Tests/RCommon.Persistence.Linq2Db.Tests/RCommon.Persistence.Linq2Db.Tests.csproj index 247a1259..c1984b24 100644 --- a/Tests/RCommon.Persistence.Linq2Db.Tests/RCommon.Persistence.Linq2Db.Tests.csproj +++ b/Tests/RCommon.Persistence.Linq2Db.Tests/RCommon.Persistence.Linq2Db.Tests.csproj @@ -11,15 +11,15 @@ - - - - - - - + + + + + + + - + diff --git a/Tests/RCommon.Security.Tests/RCommon.Security.Tests.csproj b/Tests/RCommon.Security.Tests/RCommon.Security.Tests.csproj index ae80c45a..2bba76a1 100644 --- a/Tests/RCommon.Security.Tests/RCommon.Security.Tests.csproj +++ b/Tests/RCommon.Security.Tests/RCommon.Security.Tests.csproj @@ -9,17 +9,17 @@ - - - - - + + + + + - + - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/Tests/RCommon.TestBase.Data/RCommon.TestBase.Data.csproj b/Tests/RCommon.TestBase.Data/RCommon.TestBase.Data.csproj index 3705192f..7f30ebb6 100644 --- a/Tests/RCommon.TestBase.Data/RCommon.TestBase.Data.csproj +++ b/Tests/RCommon.TestBase.Data/RCommon.TestBase.Data.csproj @@ -12,16 +12,16 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - + + + + + + + + - + diff --git a/Tests/RCommon.TestBase/RCommon.TestBase.csproj b/Tests/RCommon.TestBase/RCommon.TestBase.csproj index d098630f..30c641d2 100644 --- a/Tests/RCommon.TestBase/RCommon.TestBase.csproj +++ b/Tests/RCommon.TestBase/RCommon.TestBase.csproj @@ -8,16 +8,16 @@ - - - - - + + + + + - + - + diff --git a/Tests/RCommon.Tests/RCommon.Tests.csproj b/Tests/RCommon.Tests/RCommon.Tests.csproj index a8534f27..1a4b91f8 100644 --- a/Tests/RCommon.Tests/RCommon.Tests.csproj +++ b/Tests/RCommon.Tests/RCommon.Tests.csproj @@ -13,8 +13,8 @@ - - + +