diff --git a/src/BuildingBlocks/Authentication/OpenIdConnect/Masa.BuildingBlocks.Authentication.OpenIdConnect.Domain/Masa - Backup.BuildingBlocks.Authentication.OpenIdConnect.Domain.csproj b/src/BuildingBlocks/Authentication/OpenIdConnect/Masa.BuildingBlocks.Authentication.OpenIdConnect.Domain/Masa - Backup.BuildingBlocks.Authentication.OpenIdConnect.Domain.csproj deleted file mode 100644 index 36d94ae77..000000000 --- a/src/BuildingBlocks/Authentication/OpenIdConnect/Masa.BuildingBlocks.Authentication.OpenIdConnect.Domain/Masa - Backup.BuildingBlocks.Authentication.OpenIdConnect.Domain.csproj +++ /dev/null @@ -1,14 +0,0 @@ - - - - net6.0 - enable - enable - - - - - - - - diff --git a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain.Tests/EventTest.cs b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain.Tests/EventTest.cs index 39eff2517..4a459db75 100644 --- a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain.Tests/EventTest.cs +++ b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain.Tests/EventTest.cs @@ -1,4 +1,4 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. namespace Masa.BuildingBlocks.Ddd.Domain.Tests; diff --git a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityChangedEvent.cs b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityChangedEvent.cs new file mode 100644 index 000000000..b3fab230d --- /dev/null +++ b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityChangedEvent.cs @@ -0,0 +1,15 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.BuildingBlocks.Ddd.Domain.Events +{ + public abstract record class EntityChangedEvent : DomainCommand + { + public TEntity Entity { get; set; } + + public EntityChangedEvent(TEntity entity) + { + Entity = entity; + } + } +} diff --git a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityCreatedDomainEvent.cs b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityCreatedDomainEvent.cs new file mode 100644 index 000000000..1a285403d --- /dev/null +++ b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityCreatedDomainEvent.cs @@ -0,0 +1,12 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.BuildingBlocks.Ddd.Domain.Events +{ + public record class EntityCreatedDomainEvent : EntityChangedEvent + { + public EntityCreatedDomainEvent(TEntity entity) : base(entity) + { + } + } +} diff --git a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityDeletedDomainEvent.cs b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityDeletedDomainEvent.cs new file mode 100644 index 000000000..8bad514eb --- /dev/null +++ b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityDeletedDomainEvent.cs @@ -0,0 +1,12 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.BuildingBlocks.Ddd.Domain.Events +{ + public record class EntityDeletedDomainEvent : EntityChangedEvent + { + public EntityDeletedDomainEvent(TEntity entity) : base(entity) + { + } + } +} diff --git a/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityModifiedDomainEvent.cs b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityModifiedDomainEvent.cs new file mode 100644 index 000000000..c281f8f7c --- /dev/null +++ b/src/BuildingBlocks/Ddd/Domain/Masa.BuildingBlocks.Ddd.Domain/Events/EntityModifiedDomainEvent.cs @@ -0,0 +1,12 @@ +// Copyright (c) MASA Stack All rights reserved. +// Licensed under the MIT License. See LICENSE.txt in the project root for license information. + +namespace Masa.BuildingBlocks.Ddd.Domain.Events +{ + public record class EntityModifiedDomainEvent : EntityChangedEvent + { + public EntityModifiedDomainEvent(TEntity entity) : base(entity) + { + } + } +} diff --git a/src/BuildingBlocks/Development/Masa.BuildingBlocks.Development.DaprStarter/Masa - Backup.BuildingBlocks.Development.DaprStarters.csproj b/src/BuildingBlocks/Development/Masa.BuildingBlocks.Development.DaprStarter/Masa - Backup.BuildingBlocks.Development.DaprStarters.csproj deleted file mode 100644 index 23f339d8f..000000000 --- a/src/BuildingBlocks/Development/Masa.BuildingBlocks.Development.DaprStarter/Masa - Backup.BuildingBlocks.Development.DaprStarters.csproj +++ /dev/null @@ -1,13 +0,0 @@ - - - - net6.0 - enable - enable - - - - - - - diff --git a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/IntegrationEvent.cs b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/IntegrationEvent.cs index 2cf113ea6..4c401cce0 100644 --- a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/IntegrationEvent.cs +++ b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/IntegrationEvent.cs @@ -5,12 +5,12 @@ namespace Masa.BuildingBlocks.Dispatcher.IntegrationEvents; public abstract record IntegrationEvent : IIntegrationEvent { - [JsonInclude]public Guid EventId { private get; set; } + [JsonInclude] public Guid EventId { private get; set; } [JsonInclude] public DateTime EvenCreateTime { private get; set; } - [NotMapped] [JsonIgnore] public IUnitOfWork? UnitOfWork { get; set; } + [NotMapped][JsonIgnore] public IUnitOfWork? UnitOfWork { get; set; } public virtual string Topic { get; set; } diff --git a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs index 62cbccb11..9c2acaf61 100644 --- a/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs +++ b/src/BuildingBlocks/Dispatcher/Masa.BuildingBlocks.Dispatcher.IntegrationEvents/Logs/IIntegrationEventLogService.cs @@ -37,10 +37,16 @@ Task SaveEventAsync( Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default); + Task> BulkMarkEventAsPublishedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default); + Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default); + Task> BulkMarkEventAsInProgressAsync(IEnumerable eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default); + Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default); + Task> BulkMarkEventAsFailedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default); + /// /// Delete successfully published and expired data /// diff --git a/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions.Tests/UserFriendlyExceptionTest.cs b/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions.Tests/UserFriendlyExceptionTest.cs index a8b8795e6..3539e551d 100644 --- a/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions.Tests/UserFriendlyExceptionTest.cs +++ b/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions.Tests/UserFriendlyExceptionTest.cs @@ -1,6 +1,8 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using Microsoft.Extensions.Logging; + namespace Masa.BuildingBlocks.Exceptions.Tests; [TestClass] diff --git a/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions/MasaException.cs b/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions/MasaException.cs index c33796779..b3bf383d5 100644 --- a/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions/MasaException.cs +++ b/src/BuildingBlocks/Exception/Masa.BuildingBlocks.Exceptions/MasaException.cs @@ -1,4 +1,4 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. // ReSharper disable once CheckNamespace @@ -91,16 +91,16 @@ public MasaException(string errorCode, LogLevel? logLevel, params object[] param { } - public MasaException(string message, Exception? innerException, string errorCode, LogLevel? logLevel = null, params object[] parameters) - : base(message, innerException) + public MasaException(Exception? innerException, string errorCode, LogLevel? logLevel = null, params object[] parameters) + : base(null, innerException) { _errorCode = errorCode; _parameters = parameters; _logLevel = logLevel; } - public MasaException(Exception? innerException, string errorCode, LogLevel? logLevel = null, params object[] parameters) - : base(null, innerException) + public MasaException(string message, Exception? innerException, string errorCode, LogLevel? logLevel = null, params object[] parameters) + : base(message, innerException) { _errorCode = errorCode; _parameters = parameters; diff --git a/src/Contrib/Authentication/Identity/Masa.Contrib.Authentication.Identity.Core/DefaultUserContext.cs b/src/Contrib/Authentication/Identity/Masa.Contrib.Authentication.Identity.Core/DefaultUserContext.cs index 4af3515fe..90016be98 100644 --- a/src/Contrib/Authentication/Identity/Masa.Contrib.Authentication.Identity.Core/DefaultUserContext.cs +++ b/src/Contrib/Authentication/Identity/Masa.Contrib.Authentication.Identity.Core/DefaultUserContext.cs @@ -1,6 +1,8 @@ // Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using System.Collections; + namespace Masa.Contrib.Authentication.Identity; internal class DefaultUserContext : UserContext @@ -44,26 +46,55 @@ public DefaultUserContext( if (claimType == null) continue; - string? claimValue = null; - if (property.PropertyType != typeof(string) && typeof(System.Collections.IEnumerable).IsAssignableFrom(property.PropertyType)) - { - var claimsValues = ClaimsPrincipal?.Claims.Where(claim => claim.Type == claimType) - .Select(claim => claim.Value); - if (claimsValues?.Any() == true) - claimValue = JsonSerializer.Serialize(claimsValues); - } - else - { - claimValue = ClaimsPrincipal?.FindClaimValue(claimType); - } - + string? claimValue = ClaimsPrincipal?.FindClaimValue(claimType); if (claimValue != null) { + object? claimTypeValue = null; + + try + { + claimTypeValue = TypeConvertProvider.ConvertTo(claimValue, property.PropertyType); + } + catch + { + claimTypeValue = this.ParseNonJson(property); + } + modelRelation.Setters[property] - .Invoke(userModel, new[] { TypeConvertProvider.ConvertTo(claimValue, property.PropertyType) }); + .Invoke(userModel, new[] { claimTypeValue }); } } return userModel; } + + private object? ParseNonJson(PropertyInfo property) + { + var claimValues = new List(); + var claimType = _optionsMonitor.CurrentValue.GetClaimType(property.Name); + if (claimType == null) + return null; + + if (property.PropertyType != typeof(string) && typeof(IEnumerable).IsAssignableFrom(property.PropertyType)) + { + var claimsValues = ClaimsPrincipal?.Claims.Where(claim => claim.Type == claimType) + .Select(claim => claim.Value).ToList(); + + claimsValues?.ForEach(item => + { + try + { + var claimsValue = JsonSerializer.Deserialize>(item); + if (claimsValue?.Any() == true) + claimValues.AddRange(claimsValue); + } + catch + { + claimValues.Add(item); + } + }); + } + + return TypeConvertProvider.ConvertTo(JsonSerializer.Serialize(claimValues), property.PropertyType); + } } diff --git a/src/Contrib/Authentication/Identity/Tests/Masa.Contrib.Authentication.Identity.BlazorServer.Tests/IdentityTest.cs b/src/Contrib/Authentication/Identity/Tests/Masa.Contrib.Authentication.Identity.BlazorServer.Tests/IdentityTest.cs index c9c941b1d..1ef869d9e 100644 --- a/src/Contrib/Authentication/Identity/Tests/Masa.Contrib.Authentication.Identity.BlazorServer.Tests/IdentityTest.cs +++ b/src/Contrib/Authentication/Identity/Tests/Masa.Contrib.Authentication.Identity.BlazorServer.Tests/IdentityTest.cs @@ -1,6 +1,8 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using Newtonsoft.Json; + namespace Masa.Contrib.Authentication.Identity.BlazorServer.Tests; [TestClass] @@ -62,6 +64,93 @@ public void TestMasaIdentity2() Assert.AreEqual(1, userRoles[0]); } + [TestMethod] + public void TestMasaIdentity3() + { + var services = new ServiceCollection(); + var claimsPrincipal = new ClaimsPrincipal(new List() + { + new(new List() + { + new("sub", "1"), + new(ClaimType.DEFAULT_USER_NAME, "Jim"), + new(ClaimType.DEFAULT_USER_ROLE, "1")//"[\"1\"]" + }) + }); + Mock authenticationStateProvider = new(); + authenticationStateProvider + .Setup(provider => provider.GetAuthenticationStateAsync()) + .ReturnsAsync(new AuthenticationState(claimsPrincipal)); + + services.AddScoped(_ => authenticationStateProvider.Object); + services.AddMasaIdentity(option => + { + option.UserId = "sub"; + }); + + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + + var serviceProvider = services.BuildServiceProvider(); + var userContext = serviceProvider.GetService(); + Assert.IsNotNull(userContext); + Assert.AreEqual("1", userContext.UserId); + Assert.AreEqual("Jim", userContext.UserName); + + var userRoles = userContext.GetUserRoles().ToList(); + Assert.AreEqual(1, userRoles.Count); + Assert.AreEqual(1, userRoles[0]); + } + + [TestMethod] + public void TestMasaIdentity4() + { + var roles = new List() + { + "admin", "admin2", "admin3","admin4" + }; + var services = new ServiceCollection(); + var claimsPrincipal = new ClaimsPrincipal(new List() + { + new(new List() + { + new("sub", "1"), + new(ClaimType.DEFAULT_USER_NAME, "Jim"), + new(ClaimType.DEFAULT_USER_ROLE, JsonConvert.SerializeObject(roles))//"[\"1\"]" + }) + }); + Mock authenticationStateProvider = new(); + authenticationStateProvider + .Setup(provider => provider.GetAuthenticationStateAsync()) + .ReturnsAsync(new AuthenticationState(claimsPrincipal)); + + services.AddScoped(_ => authenticationStateProvider.Object); + services.AddMasaIdentity(option => + { + option.UserId = "sub"; + }); + + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + Assert.IsTrue(services.Any(ServiceLifetime.Scoped)); + + var serviceProvider = services.BuildServiceProvider(); + var userContext = serviceProvider.GetService(); + Assert.IsNotNull(userContext); + Assert.AreEqual("1", userContext.UserId); + Assert.AreEqual("Jim", userContext.UserName); + + var userRoles = userContext.GetUserRoles().ToList(); + Assert.AreEqual(4, userRoles.Count); + } + [TestMethod] public void TestIdentityByYaml() { diff --git a/src/Contrib/Data/Orm/EFCore/Masa.Contrib.Data.EFCore/DefaultMasaDbContext.cs b/src/Contrib/Data/Orm/EFCore/Masa.Contrib.Data.EFCore/DefaultMasaDbContext.cs index 64df82c4a..c8d8a066b 100644 --- a/src/Contrib/Data/Orm/EFCore/Masa.Contrib.Data.EFCore/DefaultMasaDbContext.cs +++ b/src/Contrib/Data/Orm/EFCore/Masa.Contrib.Data.EFCore/DefaultMasaDbContext.cs @@ -47,6 +47,20 @@ public IConcurrencyStampProvider? ConcurrencyStampProvider } } + protected virtual Dictionary ChangeEventTypes + { + get + { + var eventTypes = new Dictionary + { + {EntityState.Added,typeof(EntityCreatedDomainEvent<>)}, + {EntityState.Modified,typeof(EntityModifiedDomainEvent<>)}, + {EntityState.Deleted,typeof(EntityDeletedDomainEvent<>)} + }; + return eventTypes; + } + } + private IMultiEnvironmentContext? EnvironmentContext => Options?.ServiceProvider?.GetService(); protected IMultiTenantContext? TenantContext => Options?.ServiceProvider?.GetService(); @@ -219,6 +233,7 @@ protected virtual void OnBeforeSaveChanges() { ChangeTracker.UpdateRowVersion(ConcurrencyStampProvider); OnBeforeSaveChangesByFilters(); + PublishEntityChangedEventAsync(ChangeTracker).ConfigureAwait(false).GetAwaiter().GetResult(); DomainEventEnqueueAsync(ChangeTracker).ConfigureAwait(false).GetAwaiter().GetResult(); } @@ -226,6 +241,7 @@ protected virtual async Task OnBeforeSaveChangesAsync() { ChangeTracker.UpdateRowVersion(ConcurrencyStampProvider); OnBeforeSaveChangesByFilters(); + await PublishEntityChangedEventAsync(ChangeTracker); await DomainEventEnqueueAsync(ChangeTracker); } @@ -265,6 +281,39 @@ protected virtual async Task DomainEventEnqueueAsync(ChangeTracker changeTracker await DomainEventBus.EnqueueAsync(domainEvent); } + protected virtual Task PublishEntityChangedEventAsync(ChangeTracker changeTracker) + { + if (DomainEventBus == null) + return Task.CompletedTask; + + var domainEntities = changeTracker.Entries() + .Where(entrie => entrie.State == EntityState.Deleted || + entrie.State == EntityState.Modified || + entrie.State == EntityState.Added).ToList(); + + if (!domainEntities.Any()) + return Task.CompletedTask; + + domainEntities.ForEach(item => + { + var entityType = item.Entity.GetType(); + var eventType = item.State switch + { + EntityState.Added => ChangeEventTypes[EntityState.Added].MakeGenericType(entityType), + EntityState.Modified => ChangeEventTypes[EntityState.Modified].MakeGenericType(entityType), + EntityState.Deleted => ChangeEventTypes[EntityState.Deleted].MakeGenericType(entityType), + _ => null, + }; + + if (eventType is not null) + { + item.Entity.AddDomainEvent((IDomainEvent)Activator.CreateInstance(eventType, item.Entity)!); + } + }); + + return Task.CompletedTask; + } + /// /// Automatic soft delete. /// diff --git a/src/Contrib/Ddd/Domain/Masa.Contrib.Ddd.Domain/DomainEventBus.cs b/src/Contrib/Ddd/Domain/Masa.Contrib.Ddd.Domain/DomainEventBus.cs index a235391cf..3bea10859 100644 --- a/src/Contrib/Ddd/Domain/Masa.Contrib.Ddd.Domain/DomainEventBus.cs +++ b/src/Contrib/Ddd/Domain/Masa.Contrib.Ddd.Domain/DomainEventBus.cs @@ -51,7 +51,8 @@ bool IsAssignableFromDomainQuery(Type? type) { return IsAssignableFromDomainQuery(type.BaseType); } - return type.GetInterfaces().Any(type => type.GetGenericTypeDefinition() == typeof(IDomainQuery<>)); + + return type.GetInterfaces().Where(item => item.IsGenericType == true).Any(type => type.GetGenericTypeDefinition() == typeof(IDomainQuery<>)); } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs index a9839ec31..6dcc290a8 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.Dapr/Publisher.cs @@ -67,4 +67,58 @@ public async Task PublishAsync( @event); } } + + public async Task BulkPublishAsync( + string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events, + CancellationToken stoppingToken = default) + { + _logger?.LogDebug("-----BulkPublishEvent Integration event publishing is in progress from {AppId} with DaprAppId as '{DaprAppId}'", _appId, + _daprAppId); + + if (!@events.Any()) + return; + + MasaArgumentException.ThrowIfNullOrWhiteSpace(_daprAppId); + + var waitMasaCloudEvents = new List>(); + var waitEvents = new List(); + + @events.ForEach(item => + { + if (item.eventMessageExpand is { Isolation.Count: > 0 }) + { + var eventMessage = new IntegrationEventMessage(item.@event, item.eventMessageExpand); + var masaCloudEvent = new MasaCloudEvent(eventMessage) + { + Source = new Uri(_daprAppId, UriKind.RelativeOrAbsolute) + }; + + waitMasaCloudEvents.Add(masaCloudEvent); + } + else + { + waitEvents.Add(item.@event); + } + }); + + if (waitMasaCloudEvents.Any()) + { + await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, waitMasaCloudEvents, cancellationToken: stoppingToken); + _logger?.LogDebug( + "-----BulkPublishEvent MasaCloudEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}", + _appId, + _daprAppId, + waitMasaCloudEvents); + } + + if (waitEvents.Any()) + { + await DaprClient.BulkPublishEventAsync(_pubSubName, topicName, waitEvents, cancellationToken: stoppingToken); + _logger?.LogDebug( + "-----BulkPublishEvent Publishing integration event from {AppId} succeeded with DaprAppId is {DaprAppId} and Event is {Event}", + _appId, + _daprAppId, + @events); + } + } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs index 7caff061d..f7773f5fd 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore/IntegrationEventLogService.cs @@ -106,6 +106,28 @@ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellati }, cancellationToken); } + public async Task> BulkMarkEventAsPublishedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default) + { + var failedEventIds = new List(); + + await BulkUpdateEventStatus(eventIds, IntegrationEventStates.Published, eventLogs => + { + eventLogs.ForEach(eventLog => + { + if (eventLog.State != IntegrationEventStates.InProgress) + { + _logger?.LogWarning( + "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}", + IntegrationEventStates.Published, eventLog.State, eventLog.Id); + failedEventIds.Add(eventLog.EventId); + } + }); + + }, cancellationToken); + + return failedEventIds; + } + public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, CancellationToken cancellationToken = default) { return UpdateEventStatus(eventId, IntegrationEventStates.InProgress, eventLog => @@ -132,6 +154,37 @@ public Task MarkEventAsInProgressAsync(Guid eventId, int minimumRetryInterval, C }, cancellationToken); } + public async Task> BulkMarkEventAsInProgressAsync(IEnumerable eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default) + { + var failedEventIds = new List(); + + await BulkUpdateEventStatus(eventIds, IntegrationEventStates.InProgress, eventLogs => + { + eventLogs.ForEach(eventLog => + { + if (eventLog.State is IntegrationEventStates.InProgress or IntegrationEventStates.PublishedFailed && + (eventLog.GetCurrentTime() - eventLog.ModificationTime).TotalSeconds < minimumRetryInterval) + { + _logger?.LogInformation( + "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}, Multitasking execution error, waiting for the next retry", + IntegrationEventStates.InProgress, eventLog.State, eventLog.Id); + failedEventIds.Add(eventLog.EventId); + } + if (eventLog.State != IntegrationEventStates.NotPublished && + eventLog.State != IntegrationEventStates.InProgress && + eventLog.State != IntegrationEventStates.PublishedFailed) + { + _logger?.LogWarning( + "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}", + IntegrationEventStates.InProgress, eventLog.State, eventLog.Id); + failedEventIds.Add(eventLog.EventId); + } + }); + }, cancellationToken); + + return failedEventIds; + } + public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default) { return UpdateEventStatus(eventId, IntegrationEventStates.PublishedFailed, eventLog => @@ -147,6 +200,27 @@ public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationT }, cancellationToken); } + public async Task> BulkMarkEventAsFailedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default) + { + var failedEventIds = new List(); + + await BulkUpdateEventStatus(eventIds, IntegrationEventStates.PublishedFailed, eventLogs => + { + eventLogs.ForEach(eventLog => + { + if (eventLog.State != IntegrationEventStates.InProgress) + { + _logger?.LogWarning( + "Failed to modify the state of the local message table to {OptState}, the current State is {State}, Id: {Id}", + IntegrationEventStates.PublishedFailed, eventLog.State, eventLog.Id); + failedEventIds.Add(eventLog.EventId); + } + }); + }, cancellationToken); + + return failedEventIds; + } + public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, CancellationToken token = default) { var eventLogs = _eventLogContext.EventLogs.Where(e => e.ModificationTime < expiresAt && e.State == IntegrationEventStates.Published) @@ -164,6 +238,51 @@ public async Task DeleteExpiresAsync(DateTime expiresAt, int batchCount, Cancell } } + private async Task BulkUpdateEventStatus(IEnumerable eventIds, + IntegrationEventStates status, + Action>? action = null, + CancellationToken cancellationToken = default) + { + var eventLogEntrys = + await _eventLogContext.EventLogs.Where(e => eventIds.Contains(e.EventId)).ToListAsync(); + if (eventLogEntrys == null || !eventLogEntrys.Any()) + throw new ArgumentException( + $"The local message record does not exist, please confirm whether the local message record has been deleted or other reasons cause the local message record to not be inserted successfully In EventId: {eventIds}", + nameof(eventIds)); + + action?.Invoke(eventLogEntrys); + + var updateEventLogEntry = new List(); + foreach (var eventLogEntry in eventLogEntrys) + { + if (eventLogEntry.State == status) + { + continue; + } + + eventLogEntry.State = status; + eventLogEntry.ModificationTime = eventLogEntry.GetCurrentTime(); + + if (status == IntegrationEventStates.InProgress) + eventLogEntry.TimesSent++; + + updateEventLogEntry.Add(eventLogEntry); + } + + _eventLogContext.EventLogs.UpdateRange(updateEventLogEntry); + + try + { + await _eventLogContext.DbContext.SaveChangesAsync(cancellationToken); + } + catch (DbUpdateConcurrencyException ex) + { + throw new UserFriendlyException($"Concurrency conflict, update exception. {ex.Message}"); + } + + updateEventLogEntry.ForEach(CheckAndDetached); + } + private async Task UpdateEventStatus(Guid eventId, IntegrationEventStates status, Action? action = null, diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs index 7c6c35bb5..5c3792c38 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/IPublisher.cs @@ -1,4 +1,4 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. namespace Masa.Contrib.Dispatcher.IntegrationEvents; @@ -10,4 +10,8 @@ Task PublishAsync( T @event, IntegrationEventExpand? eventMessageExpand, CancellationToken stoppingToken = default); + + Task BulkPublishAsync( + string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> @events, + CancellationToken stoppingToken = default); } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/LocalQueueProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/LocalQueueProcessor.cs index e0fcb6d4d..c6572605c 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/LocalQueueProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Internal/LocalQueueProcessor.cs @@ -22,9 +22,15 @@ public static void SetLogger(IServiceCollection services) public void AddJobs(IntegrationEventLogItem items) => _retryEventLogs.TryAdd(items.EventId, items); + public void BulkAddJobs(List items) + => items.ForEach(item => _retryEventLogs.TryAdd(item.EventId, item)); + public void RemoveJobs(Guid eventId) => _retryEventLogs.TryRemove(eventId, out _); + public void BulkRemoveJobs(IEnumerable eventIds) + => eventIds.ToList().ForEach(eventId => _retryEventLogs.TryRemove(eventId, out _)); + public void RetryJobs(Guid eventId) { if (_retryEventLogs.TryGetValue(eventId, out IntegrationEventLogItem? item)) @@ -33,9 +39,34 @@ public void RetryJobs(Guid eventId) } } + public void BulkRetryJobs(IEnumerable eventIds) + { + foreach (var eventId in eventIds) + { + if (_retryEventLogs.TryGetValue(eventId, out IntegrationEventLogItem? item)) + { + item.Retry(); + } + } + } + public bool IsExist(Guid eventId) => _retryEventLogs.ContainsKey(eventId); + public List IsExist(IEnumerable eventIds) + { + var notEventIds = new List(); + foreach (var eventId in eventIds) + { + if (_retryEventLogs.ContainsKey(eventId)) + { + notEventIds.Add(eventId); + } + } + + return notEventIds; + } + public void Delete(int maxRetryTimes) { var eventLogItems = _retryEventLogs.Values.Where(log => log.RetryCount >= maxRetryTimes - 1).ToList(); diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Options/IntegrationEventOptions.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Options/IntegrationEventOptions.cs index ccc93ea68..fbe5bcc63 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Options/IntegrationEventOptions.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Options/IntegrationEventOptions.cs @@ -10,6 +10,11 @@ public class IntegrationEventOptions : IIntegrationEventOptions public Assembly[] Assemblies { get; } + /// + /// Send in batches according to Topic + /// + public bool BatchesGroupSendOrRetry { get; set; } = false; + private int _localRetryTimes = 3; /// diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs index fdecae742..18914bff4 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByDataProcessor.cs @@ -24,6 +24,12 @@ public RetryByDataProcessor( protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) { + if (_options.Value.BatchesGroupSendOrRetry) + { + await this.BulkExecuteAsync(serviceProvider, stoppingToken); + return; + } + var unitOfWork = serviceProvider.GetService(); if (unitOfWork != null) unitOfWork.UseTransaction = false; @@ -36,7 +42,7 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync( _options.Value.MinimumRetryInterval, stoppingToken); - if(!retrieveEventLogs.Any()) + if (!retrieveEventLogs.Any()) return; var publisher = serviceProvider.GetRequiredService(); @@ -54,7 +60,7 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync( eventLog, eventLog.Topic); - await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken); + await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken); LocalQueueProcessor.Default.RemoveJobs(eventLog.EventId); @@ -73,4 +79,83 @@ await eventLogService.RetrieveEventLogsFailedToPublishAsync( } } } + + protected async Task BulkExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) + { + var unitOfWork = serviceProvider.GetService(); + if (unitOfWork != null) + unitOfWork.UseTransaction = false; + var eventLogService = serviceProvider.GetRequiredService(); + + var retrieveEventLogs = + await eventLogService.RetrieveEventLogsFailedToPublishAsync( + _options.Value.RetryBatchSize, + _options.Value.MaxRetryTimes, + _options.Value.MinimumRetryInterval, + stoppingToken); + + if (!retrieveEventLogs.Any()) + return; + + var publisher = serviceProvider.GetRequiredService(); + var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic) + .Select(eventLog => new + { + TopicName = eventLog.Key, + Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(), + }).ToList(); + var allEventIds = retrieveEventLogsGroupByTopic. + SelectMany(eventLog => eventLog.Events.Select(item => item.EventId)); + var removeEventIds = LocalQueueProcessor.Default.IsExist(allEventIds); + + foreach (var eventLog in retrieveEventLogsGroupByTopic) + { + eventLog.Events.RemoveAll(item => removeEventIds.Contains(item.EventId)); + + var sourceEventIds = eventLog.Events.Select(item => item.EventId); + var sourceEvents = eventLog.Events; + + try + { + if (!sourceEventIds.Any()) + continue; // The local queue is retrying, no need to retry + + var failedEventIds = await eventLogService.BulkMarkEventAsInProgressAsync(sourceEventIds, + _options.Value.MinimumRetryInterval, stoppingToken); + if (failedEventIds.Any()) + { + sourceEvents = sourceEvents.Where(item => !failedEventIds.Contains(item.EventId)).ToList(); + _logger?.LogDebug("Error Publishing integration event {Event} to {TopicName} failedEventIds {failedEventIds}", + eventLog, eventLog.TopicName, failedEventIds); + } + var eventIds = sourceEvents.Select(item => item.EventId); + var events = sourceEvents.Select(item => (item.Event, item.EventExpand)).ToList(); + + _logger?.LogDebug("Publishing integration event {Event} to {TopicName}", + eventLog, + eventLog.TopicName); + + await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken); + + LocalQueueProcessor.Default.BulkRemoveJobs(eventIds); + + await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken); + + if (failedEventIds.Any()) + await eventLogService.BulkMarkEventAsFailedAsync(failedEventIds, stoppingToken); + } + catch (UserFriendlyException) + { + //Update state due to multitasking contention, no processing required + } + catch (Exception ex) + { + _logger?.LogError(ex, + "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", + sourceEventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); + await eventLogService.BulkMarkEventAsFailedAsync(sourceEventIds, stoppingToken); + } + } + } + } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs index d934cfb5c..4b3ccbd28 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/RetryByLocalQueueProcessor.cs @@ -24,6 +24,12 @@ public RetryByLocalQueueProcessor( protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) { + if (_options.Value.BatchesGroupSendOrRetry) + { + await this.BulkExecuteAsync(serviceProvider, stoppingToken); + return; + } + var unitOfWork = serviceProvider.GetService(); if (unitOfWork != null) unitOfWork.UseTransaction = false; @@ -34,7 +40,7 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes, _options.Value.RetryBatchSize); - if(!retrieveEventLogs.Any()) + if (!retrieveEventLogs.Any()) return; var publisher = serviceProvider.GetRequiredService(); @@ -52,7 +58,7 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can eventLog, eventLog.Topic); - await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken); + await publisher.PublishAsync(eventLog.Topic, eventLog.Event, eventLog.EventExpand, stoppingToken); await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, stoppingToken); @@ -72,4 +78,69 @@ protected override async Task ExecuteAsync(IServiceProvider serviceProvider, Can } } } + + protected async Task BulkExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) + { + var unitOfWork = serviceProvider.GetService(); + if (unitOfWork != null) + unitOfWork.UseTransaction = false; + + var eventLogService = serviceProvider.GetRequiredService(); + + var retrieveEventLogs = + LocalQueueProcessor.Default.RetrieveEventLogsFailedToPublishAsync(_options.Value.LocalRetryTimes, + _options.Value.RetryBatchSize); + + if (!retrieveEventLogs.Any()) + return; + + var publisher = serviceProvider.GetRequiredService(); + var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic) + .Select(eventLog => new + { + TopicName = eventLog.Key, + Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(), + }).ToList(); + + foreach (var eventLog in retrieveEventLogsGroupByTopic) + { + var eventIds = eventLog.Events.Select(item => item.EventId); + var events = eventLog.Events.Select(item => (item.Event, item.EventExpand)).ToList(); + + try + { + LocalQueueProcessor.Default.BulkRemoveJobs(eventIds); + + var failedEventIds = await eventLogService.BulkMarkEventAsInProgressAsync(eventIds, + _options.Value.MinimumRetryInterval, stoppingToken); + if (failedEventIds.Any()) + { + _logger?.LogDebug("Error Publishing integration event {Event} to {TopicName} failedEventIds {failedEventIds}", + eventLog, eventLog.TopicName, failedEventIds); + } + + _logger?.LogDebug("Publishing integration event {Event} to {TopicName}", + eventLog, + eventLog.TopicName); + + await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken); + + await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken); + + LocalQueueProcessor.Default.BulkRemoveJobs(eventIds); + } + catch (UserFriendlyException) + { + //Update state due to multitasking contention + LocalQueueProcessor.Default.BulkRemoveJobs(eventIds); + } + catch (Exception ex) + { + _logger?.LogError(ex, + "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", + eventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); + await eventLogService.BulkMarkEventAsFailedAsync(eventIds, stoppingToken); + } + } + } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs index fc45942a6..415cfd6e3 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Masa.Contrib.Dispatcher.IntegrationEvents/Processor/SendByDataProcessor.cs @@ -24,6 +24,12 @@ public SendByDataProcessor( protected override async Task ExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) { + if (_options.Value.BatchesGroupSendOrRetry) + { + await this.BulkExecuteAsync(serviceProvider, stoppingToken); + return; + } + var unitOfWork = serviceProvider.GetService(); if (unitOfWork != null) unitOfWork.UseTransaction = false; @@ -35,7 +41,7 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync( _options.Value.BatchSize, stoppingToken); - if(!retrieveEventLogs.Any()) + if (!retrieveEventLogs.Any()) return; var publisher = serviceProvider.GetRequiredService(); @@ -65,7 +71,79 @@ await eventLogService.RetrieveEventLogsPendingToPublishAsync( eventLog.EventId, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, stoppingToken); - LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event, eventLog.EventExpand)); + LocalQueueProcessor.Default.AddJobs(new IntegrationEventLogItem(eventLog.EventId, eventLog.Topic, eventLog.Event, + eventLog.EventExpand)); + } + } + } + + protected async Task BulkExecuteAsync(IServiceProvider serviceProvider, CancellationToken stoppingToken) + { + var unitOfWork = serviceProvider.GetService(); + if (unitOfWork != null) + unitOfWork.UseTransaction = false; + + var eventLogService = serviceProvider.GetRequiredService(); + + var retrieveEventLogs = + await eventLogService.RetrieveEventLogsPendingToPublishAsync( + _options.Value.BatchSize, + stoppingToken); + + if (!retrieveEventLogs.Any()) + return; + + var publisher = serviceProvider.GetRequiredService(); + var retrieveEventLogsGroupByTopic = retrieveEventLogs.GroupBy(eventLog => eventLog.Topic) + .Select(eventLog => new + { + TopicName = eventLog.Key, + Events = eventLog.Select(log => new { log.Event, log.EventExpand, log.EventId }).ToList(), + }).ToList(); + + foreach (var eventLog in retrieveEventLogsGroupByTopic) + { + var sourceEventIds = eventLog.Events.Select(item => item.EventId); + var sourceEvents = eventLog.Events; + + try + { + var failedEventIds = await eventLogService.BulkMarkEventAsInProgressAsync(sourceEventIds, + _options.Value.MinimumRetryInterval, stoppingToken); + if (failedEventIds.Any()) + { + sourceEvents = sourceEvents.Where(item => !failedEventIds.Contains(item.EventId)).ToList(); + _logger?.LogDebug("Error Publishing integration event {Event} to {TopicName} failedEventIds {failedEventIds}", + eventLog, eventLog.TopicName, failedEventIds); + } + var eventIds = sourceEvents.Select(item => item.EventId); + var events = sourceEvents.Select(item => (item.Event, item.EventExpand)).ToList(); + + _logger?.LogDebug("Publishing integration event {Event} to {TopicName}", + eventLog, + eventLog.TopicName); + + await publisher.BulkPublishAsync(eventLog.TopicName, events, stoppingToken); + await eventLogService.BulkMarkEventAsPublishedAsync(eventIds, stoppingToken); + + if (failedEventIds.Any()) + await eventLogService.BulkMarkEventAsFailedAsync(failedEventIds, stoppingToken); + } + catch (UserFriendlyException) + { + //Update state due to multitasking contention, no processing required + } + catch (Exception ex) + { + _logger?.LogError(ex, + "Error Publishing integration event: {IntegrationEventId} from {AppId} - ({IntegrationEvent})", + sourceEventIds, _masaAppConfigureOptions?.CurrentValue.AppId ?? string.Empty, eventLog); + await eventLogService.BulkMarkEventAsFailedAsync(sourceEventIds, stoppingToken); + + var integrationEventLogItem = eventLog.Events.Select(item => + new IntegrationEventLogItem(item.EventId, eventLog.TopicName, item.Event, item.EventExpand)).ToList(); + + LocalQueueProcessor.Default.BulkAddJobs(integrationEventLogItem); } } } diff --git a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs index 9d354eb8d..7731f2965 100644 --- a/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs +++ b/src/Contrib/Dispatcher/IntegrationEvents/Tests/Masa.Contrib.Dispatcher.IntegrationEvents.Tests/Infrastructure/CustomIntegrationEventLogService.cs @@ -1,4 +1,4 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. namespace Masa.Contrib.Dispatcher.IntegrationEvents.Tests.Infrastructure; @@ -47,4 +47,19 @@ public Task SaveEventAsync( { return Task.CompletedTask; } + + public async Task> BulkMarkEventAsPublishedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default) + { + return new List(); + } + + public async Task> BulkMarkEventAsInProgressAsync(IEnumerable eventIds, int minimumRetryInterval, CancellationToken cancellationToken = default) + { + return new List(); + } + + public async Task> BulkMarkEventAsFailedAsync(IEnumerable eventIds, CancellationToken cancellationToken = default) + { + return new List(); + } } diff --git a/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Dispatch/LocalEventBusWrapper.cs b/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Dispatch/LocalEventBusWrapper.cs index 25bdd6a58..d24b2b053 100644 --- a/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Dispatch/LocalEventBusWrapper.cs +++ b/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Dispatch/LocalEventBusWrapper.cs @@ -13,6 +13,8 @@ internal class LocalEventBusWrapper : ILocalEventBusWrapper private readonly IUnitOfWork? _unitOfWork; + private static string _entityChangeEventTypeName = typeof(EntityChangedEvent<>).Name; + #pragma warning disable S5332 private const string LOAD_EVENT_HELP_LINK = "https://docs.masastack.com/framework/building-blocks/dispatcher/faq#section-8fdb7a0b51854e8b4ef6"; @@ -36,8 +38,11 @@ public async Task PublishAsync( var eventType = @event.GetType(); if (!_options.UnitOfWorkRelation.ContainsKey(eventType)) { + if (eventType?.BaseType?.Name == _entityChangeEventTypeName) + return; + throw new NotSupportedException( - $"Getting \"{eventType.Name}\" relationship chain failed, see {LOAD_EVENT_HELP_LINK} for details. "); + $"Getting \"{eventType?.Name}\" relationship chain failed, see {LOAD_EVENT_HELP_LINK} for details. "); } if (_options.UnitOfWorkRelation[eventType]) diff --git a/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Options/DispatcherOptions.cs b/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Options/DispatcherOptions.cs index 79d340d1f..bd5f4e9c1 100644 --- a/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Options/DispatcherOptions.cs +++ b/src/Contrib/Dispatcher/Masa.Contrib.Dispatcher.Events/Internal/Options/DispatcherOptions.cs @@ -22,10 +22,27 @@ public DispatcherOptions(IServiceCollection services, Assembly[] assemblies) : this(services) { Assemblies = assemblies; + var allEventTypes = assemblies .SelectMany(assembly => assembly.GetTypes()) .Where(type => type.IsClass && typeof(IEvent).IsAssignableFrom(type)) .ToList(); + + allEventTypes.AddRange(GetGenericEventType(assemblies)); + UnitOfWorkRelation = allEventTypes.ToDictionary(type => type, IsSupportUnitOfWork); } + + private List GetGenericEventType(Assembly[] assemblies) + { + var methods = assemblies + .SelectMany(assembly => assembly.GetTypes().SelectMany(method => method.GetMethods())) + .Where(method => method.GetCustomAttributes(typeof(EventHandlerAttribute), false).Length > 0); + + var allEventTypes = methods.SelectMany(method => method.GetParameters().Where(type => type.ParameterType.IsGenericType == true && + type.ParameterType.GetGenericTypeDefinition()?.BaseType?.Name == typeof(EntityChangedEvent<>).Name)) + .Select(type => type.ParameterType).ToList(); + + return allEventTypes; + } } diff --git a/test/Masa.Framework.IntegrationTests.EventBus/Application/Events/RegisterUserEvent.cs b/test/Masa.Framework.IntegrationTests.EventBus/Application/Events/RegisterUserEvent.cs index 0bca53d28..febb72e6a 100644 --- a/test/Masa.Framework.IntegrationTests.EventBus/Application/Events/RegisterUserEvent.cs +++ b/test/Masa.Framework.IntegrationTests.EventBus/Application/Events/RegisterUserEvent.cs @@ -1,6 +1,8 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using Masa.BuildingBlocks.Ddd.Domain.Events; + namespace Masa.Framework.IntegrationTests.EventBus.Application.Events; public record RegisterUserEvent : Event @@ -9,3 +11,16 @@ public record RegisterUserEvent : Event public int Age { get; set; } } +public record RegisterUserDomainEvent : DomainCommand +{ + public string Name { get; set; } + + public int Age { get; set; } +} + +public record RegisterUserIntegrationDomainEvent : IntegrationDomainEvent +{ + public string Name { get; set; } + + public int Age { get; set; } +} diff --git a/test/Masa.Framework.IntegrationTests.EventBus/Application/UserHandler.cs b/test/Masa.Framework.IntegrationTests.EventBus/Application/UserHandler.cs index 5dd13e8c0..882d595b4 100644 --- a/test/Masa.Framework.IntegrationTests.EventBus/Application/UserHandler.cs +++ b/test/Masa.Framework.IntegrationTests.EventBus/Application/UserHandler.cs @@ -1,6 +1,8 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using Masa.BuildingBlocks.Ddd.Domain.Events; + namespace Masa.Framework.IntegrationTests.EventBus.Application; public class UserHandler @@ -48,7 +50,7 @@ await _repository.AddAsync(new User() public async Task UserExistAsync(UserAgeQuery query) { var checkUserQuery = new CheckUserQuery(); //Check whether the second verification can enter normally - await Assert.ThrowsExceptionAsync(async () => await _eventBus.PublishAsync(checkUserQuery),"Name is required on CheckUserQuery"); + await Assert.ThrowsExceptionAsync(async () => await _eventBus.PublishAsync(checkUserQuery), "Name is required on CheckUserQuery"); if (!checkUserQuery.Result) return; @@ -62,4 +64,14 @@ public async Task UserExistAsync(CheckUserQuery query) var user = await _repository.FindAsync(u => u.Name == query.Name); query.Result = user != null; } + + [EventHandler] + public async Task UserEntityCreatedEventAsync(EntityCreatedDomainEvent command) + { + var userEntity = command.Entity; + if (userEntity is null) + { + throw new Exception($"User 【{nameof(UserEntityCreatedEventAsync)}】 already exists"); + } + } } diff --git a/test/Masa.Framework.IntegrationTests.EventBus/Domain/Aggregate/User.cs b/test/Masa.Framework.IntegrationTests.EventBus/Domain/Aggregate/User.cs index 736ecd83f..21239e764 100644 --- a/test/Masa.Framework.IntegrationTests.EventBus/Domain/Aggregate/User.cs +++ b/test/Masa.Framework.IntegrationTests.EventBus/Domain/Aggregate/User.cs @@ -1,4 +1,4 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. namespace Masa.Framework.IntegrationTests.EventBus.Domain.Aggregate; @@ -13,4 +13,9 @@ public User() { Id = Guid.NewGuid(); } + + public void RegisterUserIntegrationDomainEvent() { + + base.AddDomainEvent(new RegisterUserIntegrationDomainEvent()); + } } diff --git a/test/Masa.Framework.IntegrationTests.EventBus/Infrastructure/Extensions/DefaultPublisher.cs b/test/Masa.Framework.IntegrationTests.EventBus/Infrastructure/Extensions/DefaultPublisher.cs index 10a12c2a5..c56c62e72 100644 --- a/test/Masa.Framework.IntegrationTests.EventBus/Infrastructure/Extensions/DefaultPublisher.cs +++ b/test/Masa.Framework.IntegrationTests.EventBus/Infrastructure/Extensions/DefaultPublisher.cs @@ -1,10 +1,16 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. + namespace Masa.Framework.IntegrationTests.EventBus.Infrastructure.Extensions; public class DefaultPublisher : IPublisher { + public Task BulkPublishAsync(string topicName, List<(T @event, IntegrationEventExpand? eventMessageExpand)> events, CancellationToken stoppingToken = default) + { + return Task.CompletedTask; + } + public Task PublishAsync(string topicName, T @event, IntegrationEventExpand? eventMessageExpand, CancellationToken stoppingToken = default) { return Task.CompletedTask; diff --git a/test/Masa.Framework.IntegrationTests.EventBus/TestDispatcher.cs b/test/Masa.Framework.IntegrationTests.EventBus/TestDispatcher.cs index fe05b2229..5033ffc43 100644 --- a/test/Masa.Framework.IntegrationTests.EventBus/TestDispatcher.cs +++ b/test/Masa.Framework.IntegrationTests.EventBus/TestDispatcher.cs @@ -1,6 +1,11 @@ -// Copyright (c) MASA Stack All rights reserved. +// Copyright (c) MASA Stack All rights reserved. // Licensed under the MIT License. See LICENSE.txt in the project root for license information. +using Masa.BuildingBlocks.Ddd.Domain.Events; +using Masa.Framework.IntegrationTests.EventBus.Application; +using System.Linq; +using System.Reflection; + namespace Masa.Framework.IntegrationTests.EventBus; [TestClass] @@ -93,7 +98,7 @@ public async Task TestEventBusOnConcurrencyAsync() }; var tasks = new ConcurrentBag(); - var testCount = 1000L; + var testCount = 100; Parallel.For(1L, testCount + 1, i => { tasks.Add(AddUserAsync(serviceProvider, @event)); @@ -112,4 +117,30 @@ private async Task AddUserAsync(IServiceProvider serviceProvider, RegisterUserEv var eventBus = scope.ServiceProvider.GetRequiredService(); await eventBus.PublishAsync(@event); } + + [TestMethod] + public async Task TestEntityCreatedEventAsync() + { + var serviceProvider = ServiceProvider; + var unitOfWork = serviceProvider.GetRequiredService(); + var eventBus = serviceProvider.GetRequiredService(); + var domainEventBus = serviceProvider.GetRequiredService(); + var dbContext = serviceProvider.GetRequiredService(); + var users = new List(); + var testCount = 100; + for (int i = 0; i < 100; i++) + { + users.Add(new User + { + Age = 18, + Name = i.ToString() + }); + } + await dbContext.Set().AddRangeAsync(users); + await dbContext.SaveChangesAsync(); + await unitOfWork.SaveChangesAsync(); + await unitOfWork.CommitAsync(); + var count = dbContext.Set().Count(); + Assert.IsTrue(count == testCount); + } }