diff --git a/src/Akka.Persistence.EventStore.Tests/PersistentSubscriptionSpec.cs b/src/Akka.Persistence.EventStore.Tests/PersistentSubscriptionSpec.cs index 89e26c1..0bbea17 100644 --- a/src/Akka.Persistence.EventStore.Tests/PersistentSubscriptionSpec.cs +++ b/src/Akka.Persistence.EventStore.Tests/PersistentSubscriptionSpec.cs @@ -59,7 +59,7 @@ public async Task ReadJournal_PersistentSubscription_should_see_new_events() await firstMessage.Ack(); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); await _eventStoreClient.AppendToStreamAsync( streamName, @@ -74,7 +74,7 @@ await _eventStoreClient.AppendToStreamAsync( await secondMessage.Ack(); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); probe.Cancel(); } @@ -118,7 +118,7 @@ public async Task ReadJournal_PersistentSubscription_should_survive_dropped_conn await firstMessage.Ack(); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); await _subscriptionClient.RestartSubsystemAsync(); @@ -137,7 +137,7 @@ await _eventStoreClient.AppendToStreamAsync( await secondMessage.Ack(); - probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); probe.Cancel(); } diff --git a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs index 737db0a..85c3fa6 100644 --- a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs +++ b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs @@ -31,7 +31,7 @@ public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash private EventStoreClient _eventStoreClient = null!; private IMessageAdapter _adapter = null!; private ActorMaterializer _mat = null!; - private ISourceQueueWithComplete> _writeQueue = null!; + private EventStoreWriter _writeQueue = null!; private readonly Dictionary _writeInProgress = new(); @@ -109,25 +109,35 @@ await EventStoreSource var messagesList = atomicWrites.ToImmutableList(); var persistenceId = messagesList.Head().PersistenceId; - var future = _writeQueue.Write(new JournalWrite( - persistenceId, - messagesList.Min(x => x.LowestSequenceNr), - messagesList - .SelectMany(y => (IImmutableList)y.Payload) - .OrderBy(y => y.SequenceNr) - .ToImmutableList())) + var lowSequenceId = messagesList.Min(x => x.LowestSequenceNr) - 2; + + var expectedVersion = lowSequenceId < 0 + ? StreamRevision.None + : StreamRevision.FromInt64(lowSequenceId); + + var currentTimestamp = DateTime.Now.Ticks; + + var future = _writeQueue + .Write( + _settings.GetStreamName(persistenceId, _tenantSettings), + messagesList + .SelectMany(y => (IImmutableList)y.Payload) + .Select(y => y.Timestamp > 0 ? y : y.WithTimestamp(currentTimestamp)) + .OrderBy(y => y.SequenceNr) + .ToImmutableList(), + expectedVersion) .ContinueWith(result => - { - var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null; - - return (IImmutableList)Enumerable - .Range(0, messagesList.Count) - .Select(_ => exception) - .ToImmutableList(); - }, - cancellationToken: _pendingWriteCts.Token, - continuationOptions: TaskContinuationOptions.ExecuteSynchronously, - scheduler: TaskScheduler.Default); + { + var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null; + + return (IImmutableList)Enumerable + .Range(0, messagesList.Count) + .Select(_ => exception) + .ToImmutableList(); + }, + cancellationToken: _pendingWriteCts.Token, + continuationOptions: TaskContinuationOptions.ExecuteSynchronously, + scheduler: TaskScheduler.Default); _writeInProgress[persistenceId] = future; var self = Self; @@ -265,38 +275,14 @@ private async Task Initialize() .Create(Context.System) .WithDispatcher(_settings.MaterializerDispatcher), namePrefix: "esWriteJournal"); - - _writeQueue = EventStoreSink - .CreateWriteQueue( - _eventStoreClient, - async journalWrite => - { - var lowSequenceId = journalWrite.LowestSequenceNumber - 2; - - var expectedVersion = lowSequenceId < 0 - ? StreamRevision.None - : StreamRevision.FromInt64(lowSequenceId); - - var currentTime = DateTime.UtcNow.Ticks; - - var events = await Source - .From(journalWrite.Events) - .Select(x => x.Timestamp > 0 ? x : x.WithTimestamp(currentTime)) - .SerializeWith(_adapter, _settings.Parallelism) - .RunAggregate( - ImmutableList.Empty, - (events, current) => events.Add(current), - _mat); - - return ( - _settings.GetStreamName(journalWrite.PersistenceId, _tenantSettings), - events, - expectedVersion); - }, - _mat, - _settings.Parallelism, - _settings.BufferSize); + _writeQueue = EventStoreWriter.From( + _eventStoreClient, + evnt => _adapter.Adapt(evnt), + _mat, + _settings.Parallelism, + _settings.BufferSize); + if (!_settings.AutoInitialize) return Status.Success.Instance; diff --git a/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs b/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs deleted file mode 100644 index d7ea8e6..0000000 --- a/src/Akka.Persistence.EventStore/Journal/JournalWrite.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System.Collections.Immutable; - -namespace Akka.Persistence.EventStore.Journal; - -public record JournalWrite( - string PersistenceId, - long LowestSequenceNumber, - IImmutableList Events); \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs index 827cc5d..94249a1 100644 --- a/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs +++ b/src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs @@ -22,7 +22,7 @@ public class EventStoreSnapshotStore : SnapshotStore private readonly EventStoreSnapshotSettings _settings; private readonly EventStoreTenantSettings _tenantSettings; private readonly ActorMaterializer _mat; - private readonly ISourceQueueWithComplete> _writeQueue; + private readonly EventStoreWriter _writeQueue; public EventStoreSnapshotStore(Config snapshotConfig) { @@ -39,27 +39,12 @@ public EventStoreSnapshotStore(Config snapshotConfig) .WithDispatcher(_settings.MaterializerDispatcher), namePrefix: "esSnapshotJournal"); - _writeQueue = EventStoreSink - .CreateWriteQueue( - _eventStoreClient, - async snapshot => - { - var events = await Source - .Single(snapshot) - .SerializeWith(_messageAdapter, _settings.Parallelism) - .RunAggregate( - ImmutableList.Empty, - (events, current) => events.Add(current), - _mat); - - return ( - _settings.GetStreamName(snapshot.Metadata.PersistenceId, _tenantSettings), - events, - null); - }, - _mat, - _settings.Parallelism, - _settings.BufferSize); + _writeQueue = EventStoreWriter.From( + _eventStoreClient, + snapshot => _messageAdapter.Adapt(snapshot.Metadata, snapshot.Snapshot), + _mat, + _settings.Parallelism, + _settings.BufferSize); } protected override async Task LoadAsync( @@ -73,7 +58,9 @@ public EventStoreSnapshotStore(Config snapshotConfig) protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { - await _writeQueue.Write(new SelectedSnapshot(metadata, snapshot)); + await _writeQueue.Write( + _settings.GetStreamName(metadata.PersistenceId, _tenantSettings), + ImmutableList.Create(new SelectedSnapshot(metadata, snapshot))); } protected override Task DeleteAsync(SnapshotMetadata metadata) diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs deleted file mode 100644 index d765e08..0000000 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System.Collections.Immutable; -using Akka.Streams; -using Akka.Streams.Dsl; -using EventStore.Client; -using JetBrains.Annotations; - -namespace Akka.Persistence.EventStore.Streams; - -public static class EventStoreSink -{ - [PublicAPI] - public static Sink> Create(EventStoreClient client, int parallelism = 5) - { - return Flow.Create() - .SelectAsync(parallelism, async writeRequest => - { - try - { - if (writeRequest.ExpectedRevision != null) - { - await client.AppendToStreamAsync( - writeRequest.Stream, - writeRequest.ExpectedRevision.Value, - writeRequest.Events, - configureOperationOptions: options => options.ThrowOnAppendFailure = true); - } - else - { - await client.AppendToStreamAsync( - writeRequest.Stream, - writeRequest.ExpectedState ?? StreamState.Any, - writeRequest.Events, - configureOperationOptions: options => options.ThrowOnAppendFailure = true); - } - - writeRequest.Ack.TrySetResult(NotUsed.Instance); - } - catch (Exception e) - { - writeRequest.Ack.TrySetException(e); - } - - return NotUsed.Instance; - }) - .ToMaterialized(Sink.Ignore(), Keep.Right) - .Named("EventStoreSink"); - } - - internal static ISourceQueueWithComplete> CreateWriteQueue( - EventStoreClient client, - Func events, StreamRevision? expectedRevision)>> - toWriteRequest, - ActorMaterializer materializer, - int parallelism = 5, - int bufferSize = 5000) - { - return Source - .Queue>(bufferSize, OverflowStrategy.DropNew) - .SelectAsync(parallelism, async x => - { - try - { - var (stream, events, expectedRevision) = await toWriteRequest(x.Item); - - return new EventStoreWrite( - stream, - events, - x.Ack, - expectedRevision); - } - catch (Exception e) - { - x.Ack.TrySetException(e); - - return EventStoreWrite.Empty; - } - }) - .ToMaterialized(Create(client, parallelism), Keep.Left) - .Run(materializer); - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs index f6f47be..0aa05f7 100644 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs +++ b/src/Akka.Persistence.EventStore/Streams/EventStoreStreamSourceExtensions.cs @@ -9,48 +9,6 @@ namespace Akka.Persistence.EventStore.Streams; [PublicAPI] public static class EventStoreStreamSourceExtensions { - public static Source SerializeWith( - this Source source, - Func> serializer, - int parallelism = 1) - { - return source - .SelectAsync(parallelism, serializer); - } - - public static Source SerializeWith( - this Source source, - Func serializer, - int parallelism = 1) - { - return source - .SerializeWith( - x => Task.FromResult(serializer(x)), - parallelism); - } - - public static Source SerializeWith( - this Source source, - IMessageAdapter adapter, - int parallelism = 1) - { - return source - .SerializeWith( - adapter.Adapt, - parallelism); - } - - public static Source SerializeWith( - this Source source, - IMessageAdapter adapter, - int parallelism = 1) - { - return source - .SerializeWith( - msg => adapter.Adapt(msg.Metadata, msg.Snapshot), - parallelism); - } - public static Source DeSerializeWith( this Source source, Func> deserializer, diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreWrite.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreWrite.cs deleted file mode 100644 index 3e37611..0000000 --- a/src/Akka.Persistence.EventStore/Streams/EventStoreWrite.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System.Collections.Immutable; -using EventStore.Client; - -namespace Akka.Persistence.EventStore.Streams; - -public record EventStoreWrite( - string Stream, - IImmutableList Events, - TaskCompletionSource Ack, - StreamRevision? ExpectedRevision = null, - StreamState? ExpectedState = null) -{ - public static readonly EventStoreWrite Empty = new("", ImmutableList.Empty, new TaskCompletionSource()); -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Streams/EventStoreWriter.cs b/src/Akka.Persistence.EventStore/Streams/EventStoreWriter.cs new file mode 100644 index 0000000..3bf5c6a --- /dev/null +++ b/src/Akka.Persistence.EventStore/Streams/EventStoreWriter.cs @@ -0,0 +1,107 @@ +using System.Collections.Immutable; +using Akka.Streams; +using Akka.Streams.Dsl; +using EventStore.Client; + +namespace Akka.Persistence.EventStore.Streams; + +internal class EventStoreWriter +{ + private readonly ISourceQueueWithComplete _writeQueue; + + private EventStoreWriter(ISourceQueueWithComplete writeQueue) + { + _writeQueue = writeQueue; + } + + public static EventStoreWriter From( + EventStoreClient client, + Func> serialize, + ActorMaterializer materializer, + int parallelism, + int bufferSize) + { + return new EventStoreWriter( + Source + .Queue(bufferSize, OverflowStrategy.DropNew) + .SelectAsync(parallelism, async x => + { + try + { + var events = await Task.WhenAll(x + .Events + .Select(serialize)); + + if (x.ExpectedRevision != null) + { + await client.AppendToStreamAsync( + x.StreamName, + x.ExpectedRevision.Value, + events, + configureOperationOptions: options => options.ThrowOnAppendFailure = true); + } + else + { + await client + .AppendToStreamAsync( + x.StreamName, + StreamState.Any, + events, + configureOperationOptions: options => options.ThrowOnAppendFailure = true); + } + + x.Ack.TrySetResult(NotUsed.Instance); + + return NotUsed.Instance; + } + catch (Exception e) + { + x.Ack.TrySetException(e); + + return NotUsed.Instance; + } + }) + .ToMaterialized(Sink.Ignore(), Keep.Left) + .Run(materializer)); + } + + public async Task Write( + string streamName, + IImmutableList events, + StreamRevision? expectedRevision = null) + { + var promise = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var result = await _writeQueue.OfferAsync(new QueueItem(streamName, events, promise, expectedRevision)); + + switch (result) + { + case QueueOfferResult.Enqueued: + break; + + case QueueOfferResult.Failure f: + promise.TrySetException(new Exception("Failed to write journal row batch", f.Cause)); + break; + + case QueueOfferResult.Dropped: + promise.TrySetException( + new Exception( + "Failed to enqueue journal row batch write, the queue buffer was full")); + break; + + case QueueOfferResult.QueueClosed: + promise.TrySetException( + new Exception( + "Failed to enqueue journal row batch write, the queue was closed.")); + break; + } + + await promise.Task; + } + + private record QueueItem( + string StreamName, + IImmutableList Events, + TaskCompletionSource Ack, + StreamRevision? ExpectedRevision = null); +} diff --git a/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs b/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs deleted file mode 100644 index d6715ae..0000000 --- a/src/Akka.Persistence.EventStore/Streams/SourceQueueWithCompleteExtensions.cs +++ /dev/null @@ -1,37 +0,0 @@ -using Akka.Streams; - -namespace Akka.Persistence.EventStore.Streams; - -internal static class SourceQueueWithCompleteExtensions -{ - internal static async Task Write(this ISourceQueueWithComplete> queue, T item) - { - var promise = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - - var result = await queue.OfferAsync(new WriteQueueItem(item, promise)); - - switch (result) - { - case QueueOfferResult.Enqueued: - break; - - case QueueOfferResult.Failure f: - promise.TrySetException(new Exception("Failed to write journal row batch", f.Cause)); - break; - - case QueueOfferResult.Dropped: - promise.TrySetException( - new Exception( - "Failed to enqueue journal row batch write, the queue buffer was full")); - break; - - case QueueOfferResult.QueueClosed: - promise.TrySetException( - new Exception( - "Failed to enqueue journal row batch write, the queue was closed.")); - break; - } - - await promise.Task; - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore/Streams/WriteQueueItem.cs b/src/Akka.Persistence.EventStore/Streams/WriteQueueItem.cs deleted file mode 100644 index 00fd959..0000000 --- a/src/Akka.Persistence.EventStore/Streams/WriteQueueItem.cs +++ /dev/null @@ -1,3 +0,0 @@ -namespace Akka.Persistence.EventStore.Streams; - -public record WriteQueueItem(TItem Item, TaskCompletionSource Ack); \ No newline at end of file