From 3590a7c677bd6f6e7fa7b3f4bc394fdb074aafaf Mon Sep 17 00:00:00 2001 From: Mattias Jakobsson Date: Sat, 29 Jun 2024 20:34:37 +0200 Subject: [PATCH] wip --- .../EventStoreJournalSettings.cs | 2 +- .../EventStoreSnapshotSettings.cs | 2 +- .../Journal/EventStoreJournal.cs | 109 ++++++++++++++---- .../Journal/JournalWrite.cs | 8 ++ .../Journal/WriteFinished.cs | 3 + .../NoThrowAwaiter.cs | 16 +++ .../Snapshot/EventStoreSnapshotStore.cs | 4 +- .../Streams/EventStoreSink.cs | 71 +++++------- .../SourceQueueWithCompleteExtensions.cs | 4 +- .../persistence.conf | 2 + src/Akka.Persistence.EventStore/snapshot.conf | 2 + 11 files changed, 152 insertions(+), 71 deletions(-) create mode 100644 src/Akka.Persistence.EventStore/Journal/JournalWrite.cs create mode 100644 src/Akka.Persistence.EventStore/Journal/WriteFinished.cs create mode 100644 src/Akka.Persistence.EventStore/NoThrowAwaiter.cs diff --git a/src/Akka.Persistence.EventStore/Configuration/EventStoreJournalSettings.cs b/src/Akka.Persistence.EventStore/Configuration/EventStoreJournalSettings.cs index abf9824..e20810c 100644 --- a/src/Akka.Persistence.EventStore/Configuration/EventStoreJournalSettings.cs +++ b/src/Akka.Persistence.EventStore/Configuration/EventStoreJournalSettings.cs @@ -25,7 +25,7 @@ public EventStoreJournalSettings(Config config) AutoInitialize = config.GetBoolean("auto-initialize"); MaterializerDispatcher = config.GetString("materializer-dispatcher", "akka.actor.default-dispatcher"); Tenant = config.GetString("tenant"); - Parallelism = config.GetInt("parallelism", 3); + Parallelism = config.GetInt("parallelism", 6); BufferSize = config.GetInt("buffer-size", 5000); } diff --git a/src/Akka.Persistence.EventStore/Configuration/EventStoreSnapshotSettings.cs b/src/Akka.Persistence.EventStore/Configuration/EventStoreSnapshotSettings.cs index ab1cbd7..35e976b 100644 --- a/src/Akka.Persistence.EventStore/Configuration/EventStoreSnapshotSettings.cs +++ b/src/Akka.Persistence.EventStore/Configuration/EventStoreSnapshotSettings.cs @@ -21,7 +21,7 @@ public EventStoreSnapshotSettings(Config config) DefaultSerializer = config.GetString("serializer"); MaterializerDispatcher = config.GetString("materializer-dispatcher", "akka.actor.default-dispatcher"); Tenant = config.GetString("tenant"); - Parallelism = config.GetInt("parallelism", 3); + Parallelism = config.GetInt("parallelism", 6); BufferSize = config.GetInt("buffer-size", 5000); } diff --git a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs index 1370f8b..737db0a 100644 --- a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs +++ b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs @@ -12,6 +12,7 @@ using Akka.Streams; using Akka.Streams.Dsl; using Akka.Streams.Implementation.Stages; +using Akka.Util.Internal; using EventStore.Client; using JetBrains.Annotations; @@ -25,27 +26,39 @@ public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash private readonly EventStoreJournalSettings _settings; private readonly EventStoreTenantSettings _tenantSettings; private readonly ILoggingAdapter _log; + private readonly CancellationTokenSource _pendingWriteCts; private EventStoreClient _eventStoreClient = null!; private IMessageAdapter _adapter = null!; private ActorMaterializer _mat = null!; - private ISourceQueueWithComplete> _writeQueue = null!; + private ISourceQueueWithComplete> _writeQueue = null!; + + private readonly Dictionary _writeInProgress = new(); // ReSharper disable once ConvertToPrimaryConstructor public EventStoreJournal(Config journalConfig) { _log = Context.GetLogger(); + _pendingWriteCts = new CancellationTokenSource(); + _settings = new EventStoreJournalSettings(journalConfig); _tenantSettings = EventStoreTenantSettings.GetFrom(Context.System); } public override async Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { + if (_writeInProgress.TryGetValue(persistenceId, out var wip)) + { + // We don't care whether the write succeeded or failed + // We just want it to finish. + await new NoThrowAwaiter(wip); + } + var filter = EventStoreEventStreamFilter.FromEnd(_settings.GetStreamName(persistenceId, _tenantSettings), fromSequenceNr); var lastMessage = await EventStoreSource .FromStream(_eventStoreClient, filter) - .DeSerializeEventWith(_adapter) + .DeSerializeEventWith(_adapter, _settings.Parallelism) .Filter(filter) .Take(1) .RunWith(new FirstOrDefault>(), _mat); @@ -84,21 +97,51 @@ public override async Task ReplayMessagesAsync( await EventStoreSource .FromStream(_eventStoreClient, filter) - .DeSerializeEventWith(_adapter) + .DeSerializeEventWith(_adapter, _settings.Parallelism) .Filter(filter) .Take(n: max) .RunForeach(r => recoveryCallback(r.Data), _mat); } - protected override async Task> WriteMessagesAsync( + protected override Task> WriteMessagesAsync( IEnumerable atomicWrites) { - var results = await Task.WhenAll(atomicWrites - .Select(x => _writeQueue - .Write(x) - .ContinueWith(result => result.IsCompletedSuccessfully ? null : result.Exception))); - - return results.Select(x => x != null ? TryUnwrapException(x) : null).ToImmutableList(); + var messagesList = atomicWrites.ToImmutableList(); + var persistenceId = messagesList.Head().PersistenceId; + + var future = _writeQueue.Write(new JournalWrite( + persistenceId, + messagesList.Min(x => x.LowestSequenceNr), + messagesList + .SelectMany(y => (IImmutableList)y.Payload) + .OrderBy(y => y.SequenceNr) + .ToImmutableList())) + .ContinueWith(result => + { + var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null; + + return (IImmutableList)Enumerable + .Range(0, messagesList.Count) + .Select(_ => exception) + .ToImmutableList(); + }, + cancellationToken: _pendingWriteCts.Token, + continuationOptions: TaskContinuationOptions.ExecuteSynchronously, + scheduler: TaskScheduler.Default); + + _writeInProgress[persistenceId] = future; + var self = Self; + + // When we are done, we want to send a 'WriteFinished' so that + // Sequence Number reads won't block/await/etc. + future.ContinueWith( + continuationAction: p => self.Tell(new WriteFinished(persistenceId, p)), + cancellationToken: _pendingWriteCts.Token, + continuationOptions: TaskContinuationOptions.ExecuteSynchronously, + scheduler: TaskScheduler.Default); + + // But we still want to return the future from `AsyncWriteMessages` + return future; } protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) @@ -109,7 +152,7 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t var lastMessage = await EventStoreSource .FromStream(_eventStoreClient, filter) - .DeSerializeEventWith(_adapter) + .DeSerializeEventWith(_adapter, _settings.Parallelism) .Filter(filter) .Take(1) .RunWith(new FirstOrDefault>(), _mat); @@ -156,6 +199,34 @@ protected override void PreStart() BecomeStacked(Initializing); } + protected override void PostStop() + { + base.PostStop(); + _pendingWriteCts.Cancel(); + _pendingWriteCts.Dispose(); + } + + public override void AroundPreRestart(Exception cause, object? message) + { + _log.Error(cause, $"EventStore Journal Error on {message?.GetType().ToString() ?? "null"}"); + base.AroundPreRestart(cause, message); + } + + protected override bool ReceivePluginInternal(object message) + { + switch (message) + { + case WriteFinished wf: + if (_writeInProgress.TryGetValue(wf.PersistenceId, out var latestPending) & (latestPending == wf.Future)) + _writeInProgress.Remove(wf.PersistenceId); + + return true; + + default: + return false; + } + } + private bool Initializing(object message) { switch (message) @@ -196,15 +267,11 @@ private async Task Initialize() namePrefix: "esWriteJournal"); _writeQueue = EventStoreSink - .CreateWriteQueue( + .CreateWriteQueue( _eventStoreClient, - async atomicWrite => + async journalWrite => { - var persistentMessages = (IImmutableList)atomicWrite.Payload; - - var persistenceId = atomicWrite.PersistenceId; - - var lowSequenceId = persistentMessages.Min(c => c.SequenceNr) - 2; + var lowSequenceId = journalWrite.LowestSequenceNumber - 2; var expectedVersion = lowSequenceId < 0 ? StreamRevision.None @@ -213,16 +280,16 @@ private async Task Initialize() var currentTime = DateTime.UtcNow.Ticks; var events = await Source - .From(persistentMessages) + .From(journalWrite.Events) .Select(x => x.Timestamp > 0 ? x : x.WithTimestamp(currentTime)) - .SerializeWith(_adapter) + .SerializeWith(_adapter, _settings.Parallelism) .RunAggregate( ImmutableList.Empty, (events, current) => events.Add(current), _mat); return ( - _settings.GetStreamName(persistenceId, _tenantSettings), + _settings.GetStreamName(journalWrite.PersistenceId, _tenantSettings), events, expectedVersion); }, diff --git a/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs b/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs new file mode 100644 index 0000000..d7ea8e6 --- /dev/null +++ b/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs @@ -0,0 +1,8 @@ +using System.Collections.Immutable; + +namespace Akka.Persistence.EventStore.Journal; + +public record JournalWrite( + string PersistenceId, + long LowestSequenceNumber, + IImmutableList Events); \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Journal/WriteFinished.cs b/src/Akka.Persistence.EventStore/Journal/WriteFinished.cs new file mode 100644 index 0000000..2fc7f39 --- /dev/null +++ b/src/Akka.Persistence.EventStore/Journal/WriteFinished.cs @@ -0,0 +1,3 @@ +namespace Akka.Persistence.EventStore.Journal; + +public sealed record WriteFinished(string PersistenceId, Task Future); \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/NoThrowAwaiter.cs b/src/Akka.Persistence.EventStore/NoThrowAwaiter.cs new file mode 100644 index 0000000..9d5c880 --- /dev/null +++ b/src/Akka.Persistence.EventStore/NoThrowAwaiter.cs @@ -0,0 +1,16 @@ +using System.Runtime.CompilerServices; + +namespace Akka.Persistence.EventStore; + +internal readonly struct NoThrowAwaiter(Task task) : ICriticalNotifyCompletion +{ + public NoThrowAwaiter GetAwaiter() => this; + + public bool IsCompleted => task.IsCompleted; + + public void GetResult() { } + + public void OnCompleted(Action continuation) => task.GetAwaiter().OnCompleted(continuation); + + public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation); +} diff --git a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs index 0145e1e..827cc5d 100644 --- a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs +++ b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs @@ -46,7 +46,7 @@ public EventStoreSnapshotStore(Config snapshotConfig) { var events = await Source .Single(snapshot) - .SerializeWith(_messageAdapter) + .SerializeWith(_messageAdapter, _settings.Parallelism) .RunAggregate( ImmutableList.Empty, (events, current) => events.Add(current), @@ -126,7 +126,7 @@ await _eventStoreClient.SetStreamMetadataAsync( return await EventStoreSource .FromStream(_eventStoreClient, filter) - .DeSerializeSnapshotWith(_messageAdapter) + .DeSerializeSnapshotWith(_messageAdapter, _settings.Parallelism) .Filter(filter) .Take(1) .RunWith(new FirstOrDefault>(), _mat); diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs index bad4dbd..d765e08 100644 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs +++ b/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs @@ -12,51 +12,33 @@ public static class EventStoreSink public static Sink> Create(EventStoreClient client, int parallelism = 5) { return Flow.Create() - .Batch( - parallelism, - ImmutableList.Create, - (current, item) => current.Add(item)) - .SelectAsync(1, async writeRequests => + .SelectAsync(parallelism, async writeRequest => { - var writes = writeRequests - .GroupBy(x => x.Stream) - .Select(async requestsForStream => + try + { + if (writeRequest.ExpectedRevision != null) { - foreach (var writeRequest in requestsForStream) - { - try - { - if (writeRequest.Events.Any()) - { - if (writeRequest.ExpectedRevision != null) - { - await client.AppendToStreamAsync( - writeRequest.Stream, - writeRequest.ExpectedRevision.Value, - writeRequest.Events, - configureOperationOptions: options => options.ThrowOnAppendFailure = true); - } - else - { - await client.AppendToStreamAsync( - writeRequest.Stream, - writeRequest.ExpectedState ?? StreamState.Any, - writeRequest.Events, - configureOperationOptions: options => options.ThrowOnAppendFailure = true); - } - } - - writeRequest.Ack.TrySetResult(NotUsed.Instance); - } - catch (Exception e) - { - writeRequest.Ack.TrySetException(e); - } - } - }) - .ToImmutableList(); + await client.AppendToStreamAsync( + writeRequest.Stream, + writeRequest.ExpectedRevision.Value, + writeRequest.Events, + configureOperationOptions: options => options.ThrowOnAppendFailure = true); + } + else + { + await client.AppendToStreamAsync( + writeRequest.Stream, + writeRequest.ExpectedState ?? StreamState.Any, + writeRequest.Events, + configureOperationOptions: options => options.ThrowOnAppendFailure = true); + } - await Task.WhenAll(writes); + writeRequest.Ack.TrySetResult(NotUsed.Instance); + } + catch (Exception e) + { + writeRequest.Ack.TrySetException(e); + } return NotUsed.Instance; }) @@ -64,9 +46,10 @@ await client.AppendToStreamAsync( .Named("EventStoreSink"); } - public static ISourceQueueWithComplete> CreateWriteQueue( + internal static ISourceQueueWithComplete> CreateWriteQueue( EventStoreClient client, - Func events, StreamRevision? expectedRevision)>> toWriteRequest, + Func events, StreamRevision? expectedRevision)>> + toWriteRequest, ActorMaterializer materializer, int parallelism = 5, int bufferSize = 5000) diff --git a/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs b/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs index b71d051..d6715ae 100644 --- a/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs +++ b/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs @@ -2,9 +2,9 @@ namespace Akka.Persistence.EventStore.Streams; -public static class SourceQueueWithCompleteExtensions +internal static class SourceQueueWithCompleteExtensions { - public static async Task Write(this ISourceQueueWithComplete> queue, T item) + internal static async Task Write(this ISourceQueueWithComplete> queue, T item) { var promise = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/src/Akka.Persistence.EventStore/persistence.conf b/src/Akka.Persistence.EventStore/persistence.conf index d354268..d7b783a 100644 --- a/src/Akka.Persistence.EventStore/persistence.conf +++ b/src/Akka.Persistence.EventStore/persistence.conf @@ -18,6 +18,8 @@ prefix = "" tenant = "" + parallelism = 6 + buffer-size = 5000 tagged-stream-name-pattern = "tagged-[[TAG]]" persistence-ids-stream-name = "persistenceids" diff --git a/src/Akka.Persistence.EventStore/snapshot.conf b/src/Akka.Persistence.EventStore/snapshot.conf index bef348a..2054c73 100644 --- a/src/Akka.Persistence.EventStore/snapshot.conf +++ b/src/Akka.Persistence.EventStore/snapshot.conf @@ -14,6 +14,8 @@ adapter = "default" tenant = "" + parallelism = 6 + buffer-size = 5000 # prefix used to create stream name along side with PersistenceId for snapshot prefix = "snapshot@"