Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasJakobsson committed Jun 29, 2024
1 parent 3195fb9 commit 3590a7c
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public EventStoreJournalSettings(Config config)
AutoInitialize = config.GetBoolean("auto-initialize");
MaterializerDispatcher = config.GetString("materializer-dispatcher", "akka.actor.default-dispatcher");
Tenant = config.GetString("tenant");
Parallelism = config.GetInt("parallelism", 3);
Parallelism = config.GetInt("parallelism", 6);
BufferSize = config.GetInt("buffer-size", 5000);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public EventStoreSnapshotSettings(Config config)
DefaultSerializer = config.GetString("serializer");
MaterializerDispatcher = config.GetString("materializer-dispatcher", "akka.actor.default-dispatcher");
Tenant = config.GetString("tenant");
Parallelism = config.GetInt("parallelism", 3);
Parallelism = config.GetInt("parallelism", 6);
BufferSize = config.GetInt("buffer-size", 5000);
}

Expand Down
109 changes: 88 additions & 21 deletions src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation.Stages;
using Akka.Util.Internal;
using EventStore.Client;
using JetBrains.Annotations;

Expand All @@ -25,27 +26,39 @@ public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash
private readonly EventStoreJournalSettings _settings;
private readonly EventStoreTenantSettings _tenantSettings;
private readonly ILoggingAdapter _log;
private readonly CancellationTokenSource _pendingWriteCts;

private EventStoreClient _eventStoreClient = null!;
private IMessageAdapter _adapter = null!;
private ActorMaterializer _mat = null!;
private ISourceQueueWithComplete<WriteQueueItem<AtomicWrite>> _writeQueue = null!;
private ISourceQueueWithComplete<WriteQueueItem<JournalWrite>> _writeQueue = null!;

private readonly Dictionary<string, Task> _writeInProgress = new();

// ReSharper disable once ConvertToPrimaryConstructor
public EventStoreJournal(Config journalConfig)
{
_log = Context.GetLogger();
_pendingWriteCts = new CancellationTokenSource();

_settings = new EventStoreJournalSettings(journalConfig);
_tenantSettings = EventStoreTenantSettings.GetFrom(Context.System);
}

public override async Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
if (_writeInProgress.TryGetValue(persistenceId, out var wip))
{
// We don't care whether the write succeeded or failed
// We just want it to finish.
await new NoThrowAwaiter(wip);
}

var filter = EventStoreEventStreamFilter.FromEnd(_settings.GetStreamName(persistenceId, _tenantSettings), fromSequenceNr);

var lastMessage = await EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeEventWith(_adapter)
.DeSerializeEventWith(_adapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), _mat);
Expand Down Expand Up @@ -84,21 +97,51 @@ public override async Task ReplayMessagesAsync(

await EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeEventWith(_adapter)
.DeSerializeEventWith(_adapter, _settings.Parallelism)
.Filter(filter)
.Take(n: max)
.RunForeach(r => recoveryCallback(r.Data), _mat);
}

protected override async Task<IImmutableList<Exception?>> WriteMessagesAsync(
protected override Task<IImmutableList<Exception?>> WriteMessagesAsync(
IEnumerable<AtomicWrite> atomicWrites)
{
var results = await Task.WhenAll(atomicWrites
.Select(x => _writeQueue
.Write(x)
.ContinueWith(result => result.IsCompletedSuccessfully ? null : result.Exception)));

return results.Select(x => x != null ? TryUnwrapException(x) : null).ToImmutableList();
var messagesList = atomicWrites.ToImmutableList();
var persistenceId = messagesList.Head().PersistenceId;

var future = _writeQueue.Write(new JournalWrite(
persistenceId,
messagesList.Min(x => x.LowestSequenceNr),
messagesList
.SelectMany(y => (IImmutableList<IPersistentRepresentation>)y.Payload)
.OrderBy(y => y.SequenceNr)
.ToImmutableList()))
.ContinueWith(result =>
{
var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null;

return (IImmutableList<Exception?>)Enumerable
.Range(0, messagesList.Count)
.Select(_ => exception)
.ToImmutableList();
},
cancellationToken: _pendingWriteCts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);

_writeInProgress[persistenceId] = future;
var self = Self;

// When we are done, we want to send a 'WriteFinished' so that
// Sequence Number reads won't block/await/etc.
future.ContinueWith(
continuationAction: p => self.Tell(new WriteFinished(persistenceId, p)),
cancellationToken: _pendingWriteCts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);

// But we still want to return the future from `AsyncWriteMessages`
return future;
}

protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr)
Expand All @@ -109,7 +152,7 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t

var lastMessage = await EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeEventWith(_adapter)
.DeSerializeEventWith(_adapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<IPersistentRepresentation>>(), _mat);
Expand Down Expand Up @@ -156,6 +199,34 @@ protected override void PreStart()
BecomeStacked(Initializing);
}

protected override void PostStop()
{
base.PostStop();
_pendingWriteCts.Cancel();
_pendingWriteCts.Dispose();
}

public override void AroundPreRestart(Exception cause, object? message)
{
_log.Error(cause, $"EventStore Journal Error on {message?.GetType().ToString() ?? "null"}");
base.AroundPreRestart(cause, message);
}

protected override bool ReceivePluginInternal(object message)
{
switch (message)
{
case WriteFinished wf:
if (_writeInProgress.TryGetValue(wf.PersistenceId, out var latestPending) & (latestPending == wf.Future))
_writeInProgress.Remove(wf.PersistenceId);

return true;

default:
return false;
}
}

private bool Initializing(object message)
{
switch (message)
Expand Down Expand Up @@ -196,15 +267,11 @@ private async Task<Status> Initialize()
namePrefix: "esWriteJournal");

_writeQueue = EventStoreSink
.CreateWriteQueue<AtomicWrite>(
.CreateWriteQueue<JournalWrite>(
_eventStoreClient,
async atomicWrite =>
async journalWrite =>
{
var persistentMessages = (IImmutableList<IPersistentRepresentation>)atomicWrite.Payload;

var persistenceId = atomicWrite.PersistenceId;

var lowSequenceId = persistentMessages.Min(c => c.SequenceNr) - 2;
var lowSequenceId = journalWrite.LowestSequenceNumber - 2;

var expectedVersion = lowSequenceId < 0
? StreamRevision.None
Expand All @@ -213,16 +280,16 @@ private async Task<Status> Initialize()
var currentTime = DateTime.UtcNow.Ticks;

var events = await Source
.From(persistentMessages)
.From(journalWrite.Events)
.Select(x => x.Timestamp > 0 ? x : x.WithTimestamp(currentTime))
.SerializeWith(_adapter)
.SerializeWith(_adapter, _settings.Parallelism)
.RunAggregate(
ImmutableList<EventData>.Empty,
(events, current) => events.Add(current),
_mat);

return (
_settings.GetStreamName(persistenceId, _tenantSettings),
_settings.GetStreamName(journalWrite.PersistenceId, _tenantSettings),
events,
expectedVersion);
},
Expand Down
8 changes: 8 additions & 0 deletions src/Akka.Persistence.EventStore/Journal/JournalWrite.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using System.Collections.Immutable;

namespace Akka.Persistence.EventStore.Journal;

public record JournalWrite(
string PersistenceId,
long LowestSequenceNumber,
IImmutableList<IPersistentRepresentation> Events);
3 changes: 3 additions & 0 deletions src/Akka.Persistence.EventStore/Journal/WriteFinished.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Akka.Persistence.EventStore.Journal;

public sealed record WriteFinished(string PersistenceId, Task Future);
16 changes: 16 additions & 0 deletions src/Akka.Persistence.EventStore/NoThrowAwaiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Runtime.CompilerServices;

namespace Akka.Persistence.EventStore;

internal readonly struct NoThrowAwaiter(Task task) : ICriticalNotifyCompletion
{
public NoThrowAwaiter GetAwaiter() => this;

public bool IsCompleted => task.IsCompleted;

public void GetResult() { }

public void OnCompleted(Action continuation) => task.GetAwaiter().OnCompleted(continuation);

public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public EventStoreSnapshotStore(Config snapshotConfig)
{
var events = await Source
.Single(snapshot)
.SerializeWith(_messageAdapter)
.SerializeWith(_messageAdapter, _settings.Parallelism)
.RunAggregate(
ImmutableList<EventData>.Empty,
(events, current) => events.Add(current),
Expand Down Expand Up @@ -126,7 +126,7 @@ await _eventStoreClient.SetStreamMetadataAsync(

return await EventStoreSource
.FromStream(_eventStoreClient, filter)
.DeSerializeSnapshotWith(_messageAdapter)
.DeSerializeSnapshotWith(_messageAdapter, _settings.Parallelism)
.Filter(filter)
.Take(1)
.RunWith(new FirstOrDefault<ReplayCompletion<SelectedSnapshot>>(), _mat);
Expand Down
71 changes: 27 additions & 44 deletions src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,44 @@ public static class EventStoreSink
public static Sink<EventStoreWrite, Task<Done>> Create(EventStoreClient client, int parallelism = 5)
{
return Flow.Create<EventStoreWrite>()
.Batch(
parallelism,
ImmutableList.Create,
(current, item) => current.Add(item))
.SelectAsync(1, async writeRequests =>
.SelectAsync(parallelism, async writeRequest =>
{
var writes = writeRequests
.GroupBy(x => x.Stream)
.Select(async requestsForStream =>
try
{
if (writeRequest.ExpectedRevision != null)
{
foreach (var writeRequest in requestsForStream)
{
try
{
if (writeRequest.Events.Any())
{
if (writeRequest.ExpectedRevision != null)
{
await client.AppendToStreamAsync(
writeRequest.Stream,
writeRequest.ExpectedRevision.Value,
writeRequest.Events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
}
else
{
await client.AppendToStreamAsync(
writeRequest.Stream,
writeRequest.ExpectedState ?? StreamState.Any,
writeRequest.Events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
}
}

writeRequest.Ack.TrySetResult(NotUsed.Instance);
}
catch (Exception e)
{
writeRequest.Ack.TrySetException(e);
}
}
})
.ToImmutableList();
await client.AppendToStreamAsync(
writeRequest.Stream,
writeRequest.ExpectedRevision.Value,
writeRequest.Events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
}
else
{
await client.AppendToStreamAsync(
writeRequest.Stream,
writeRequest.ExpectedState ?? StreamState.Any,
writeRequest.Events,
configureOperationOptions: options => options.ThrowOnAppendFailure = true);
}

await Task.WhenAll(writes);
writeRequest.Ack.TrySetResult(NotUsed.Instance);
}
catch (Exception e)
{
writeRequest.Ack.TrySetException(e);
}

return NotUsed.Instance;
})
.ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Right)
.Named("EventStoreSink");
}

public static ISourceQueueWithComplete<WriteQueueItem<T>> CreateWriteQueue<T>(
internal static ISourceQueueWithComplete<WriteQueueItem<T>> CreateWriteQueue<T>(
EventStoreClient client,
Func<T, Task<(string stream, IImmutableList<EventData> events, StreamRevision? expectedRevision)>> toWriteRequest,
Func<T, Task<(string stream, IImmutableList<EventData> events, StreamRevision? expectedRevision)>>
toWriteRequest,
ActorMaterializer materializer,
int parallelism = 5,
int bufferSize = 5000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Akka.Persistence.EventStore.Streams;

public static class SourceQueueWithCompleteExtensions
internal static class SourceQueueWithCompleteExtensions
{
public static async Task Write<T>(this ISourceQueueWithComplete<WriteQueueItem<T>> queue, T item)
internal static async Task Write<T>(this ISourceQueueWithComplete<WriteQueueItem<T>> queue, T item)
{
var promise = new TaskCompletionSource<NotUsed>(TaskCreationOptions.RunContinuationsAsynchronously);

Expand Down
2 changes: 2 additions & 0 deletions src/Akka.Persistence.EventStore/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

prefix = ""
tenant = ""
parallelism = 6
buffer-size = 5000

tagged-stream-name-pattern = "tagged-[[TAG]]"
persistence-ids-stream-name = "persistenceids"
Expand Down
2 changes: 2 additions & 0 deletions src/Akka.Persistence.EventStore/snapshot.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
adapter = "default"

tenant = ""
parallelism = 6
buffer-size = 5000

# prefix used to create stream name along side with PersistenceId for snapshot
prefix = "snapshot@"
Expand Down

0 comments on commit 3590a7c

Please sign in to comment.