Skip to content

Commit

Permalink
Refactored eventstore writes
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasJakobsson committed Jun 29, 2024
1 parent 3590a7c commit 164d98f
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task ReadJournal_PersistentSubscription_should_see_new_events()

await firstMessage.Ack();

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));

await _eventStoreClient.AppendToStreamAsync(
streamName,
Expand All @@ -74,7 +74,7 @@ await _eventStoreClient.AppendToStreamAsync(

await secondMessage.Ack();

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));

probe.Cancel();
}
Expand Down Expand Up @@ -118,7 +118,7 @@ public async Task ReadJournal_PersistentSubscription_should_survive_dropped_conn

await firstMessage.Ack();

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));

await _subscriptionClient.RestartSubsystemAsync();

Expand All @@ -137,7 +137,7 @@ await _eventStoreClient.AppendToStreamAsync(

await secondMessage.Ack();

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(200));

probe.Cancel();
}
Expand Down
86 changes: 36 additions & 50 deletions src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class EventStoreJournal : AsyncWriteJournal, IWithUnboundedStash
private EventStoreClient _eventStoreClient = null!;
private IMessageAdapter _adapter = null!;
private ActorMaterializer _mat = null!;
private ISourceQueueWithComplete<WriteQueueItem<JournalWrite>> _writeQueue = null!;
private EventStoreWriter<IPersistentRepresentation> _writeQueue = null!;

private readonly Dictionary<string, Task> _writeInProgress = new();

Expand Down Expand Up @@ -109,25 +109,35 @@ await EventStoreSource
var messagesList = atomicWrites.ToImmutableList();
var persistenceId = messagesList.Head().PersistenceId;

var future = _writeQueue.Write(new JournalWrite(
persistenceId,
messagesList.Min(x => x.LowestSequenceNr),
messagesList
.SelectMany(y => (IImmutableList<IPersistentRepresentation>)y.Payload)
.OrderBy(y => y.SequenceNr)
.ToImmutableList()))
var lowSequenceId = messagesList.Min(x => x.LowestSequenceNr) - 2;

var expectedVersion = lowSequenceId < 0
? StreamRevision.None
: StreamRevision.FromInt64(lowSequenceId);

var currentTimestamp = DateTime.Now.Ticks;

var future = _writeQueue
.Write(
_settings.GetStreamName(persistenceId, _tenantSettings),
messagesList
.SelectMany(y => (IImmutableList<IPersistentRepresentation>)y.Payload)
.Select(y => y.Timestamp > 0 ? y : y.WithTimestamp(currentTimestamp))
.OrderBy(y => y.SequenceNr)
.ToImmutableList(),
expectedVersion)
.ContinueWith(result =>
{
var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null;
return (IImmutableList<Exception?>)Enumerable
.Range(0, messagesList.Count)
.Select(_ => exception)
.ToImmutableList();
},
cancellationToken: _pendingWriteCts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);
{
var exception = result.Exception != null ? TryUnwrapException(result.Exception) : null;

return (IImmutableList<Exception?>)Enumerable
.Range(0, messagesList.Count)
.Select(_ => exception)
.ToImmutableList();
},
cancellationToken: _pendingWriteCts.Token,
continuationOptions: TaskContinuationOptions.ExecuteSynchronously,
scheduler: TaskScheduler.Default);

_writeInProgress[persistenceId] = future;
var self = Self;
Expand Down Expand Up @@ -265,38 +275,14 @@ private async Task<Status> Initialize()
.Create(Context.System)
.WithDispatcher(_settings.MaterializerDispatcher),
namePrefix: "esWriteJournal");

_writeQueue = EventStoreSink
.CreateWriteQueue<JournalWrite>(
_eventStoreClient,
async journalWrite =>
{
var lowSequenceId = journalWrite.LowestSequenceNumber - 2;

var expectedVersion = lowSequenceId < 0
? StreamRevision.None
: StreamRevision.FromInt64(lowSequenceId);

var currentTime = DateTime.UtcNow.Ticks;

var events = await Source
.From(journalWrite.Events)
.Select(x => x.Timestamp > 0 ? x : x.WithTimestamp(currentTime))
.SerializeWith(_adapter, _settings.Parallelism)
.RunAggregate(
ImmutableList<EventData>.Empty,
(events, current) => events.Add(current),
_mat);

return (
_settings.GetStreamName(journalWrite.PersistenceId, _tenantSettings),
events,
expectedVersion);
},
_mat,
_settings.Parallelism,
_settings.BufferSize);

_writeQueue = EventStoreWriter<IPersistentRepresentation>.From(
_eventStoreClient,
evnt => _adapter.Adapt(evnt),
_mat,
_settings.Parallelism,
_settings.BufferSize);

if (!_settings.AutoInitialize)
return Status.Success.Instance;

Expand Down
8 changes: 0 additions & 8 deletions src/Akka.Persistence.EventStore/Journal/JournalWrite.cs

This file was deleted.

33 changes: 10 additions & 23 deletions src/Akka.Persistence.EventStore/Snapshot/EventStoreSnapshotStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class EventStoreSnapshotStore : SnapshotStore
private readonly EventStoreSnapshotSettings _settings;
private readonly EventStoreTenantSettings _tenantSettings;
private readonly ActorMaterializer _mat;
private readonly ISourceQueueWithComplete<WriteQueueItem<SelectedSnapshot>> _writeQueue;
private readonly EventStoreWriter<SelectedSnapshot> _writeQueue;

public EventStoreSnapshotStore(Config snapshotConfig)
{
Expand All @@ -39,27 +39,12 @@ public EventStoreSnapshotStore(Config snapshotConfig)
.WithDispatcher(_settings.MaterializerDispatcher),
namePrefix: "esSnapshotJournal");

_writeQueue = EventStoreSink
.CreateWriteQueue<SelectedSnapshot>(
_eventStoreClient,
async snapshot =>
{
var events = await Source
.Single(snapshot)
.SerializeWith(_messageAdapter, _settings.Parallelism)
.RunAggregate(
ImmutableList<EventData>.Empty,
(events, current) => events.Add(current),
_mat);

return (
_settings.GetStreamName(snapshot.Metadata.PersistenceId, _tenantSettings),
events,
null);
},
_mat,
_settings.Parallelism,
_settings.BufferSize);
_writeQueue = EventStoreWriter<SelectedSnapshot>.From(
_eventStoreClient,
snapshot => _messageAdapter.Adapt(snapshot.Metadata, snapshot.Snapshot),
_mat,
_settings.Parallelism,
_settings.BufferSize);
}

protected override async Task<SelectedSnapshot?> LoadAsync(
Expand All @@ -73,7 +58,9 @@ public EventStoreSnapshotStore(Config snapshotConfig)

protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
await _writeQueue.Write(new SelectedSnapshot(metadata, snapshot));
await _writeQueue.Write(
_settings.GetStreamName(metadata.PersistenceId, _tenantSettings),
ImmutableList.Create(new SelectedSnapshot(metadata, snapshot)));
}

protected override Task DeleteAsync(SnapshotMetadata metadata)
Expand Down
81 changes: 0 additions & 81 deletions src/Akka.Persistence.EventStore/Streams/EventStoreSink.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,6 @@ namespace Akka.Persistence.EventStore.Streams;
[PublicAPI]
public static class EventStoreStreamSourceExtensions
{
public static Source<EventData, NotUsed> SerializeWith<TSource>(
this Source<TSource, NotUsed> source,
Func<TSource, Task<EventData>> serializer,
int parallelism = 1)
{
return source
.SelectAsync(parallelism, serializer);
}

public static Source<EventData, NotUsed> SerializeWith<TSource>(
this Source<TSource, NotUsed> source,
Func<TSource, EventData> serializer,
int parallelism = 1)
{
return source
.SerializeWith(
x => Task.FromResult(serializer(x)),
parallelism);
}

public static Source<EventData, NotUsed> SerializeWith(
this Source<IPersistentRepresentation, NotUsed> source,
IMessageAdapter adapter,
int parallelism = 1)
{
return source
.SerializeWith(
adapter.Adapt,
parallelism);
}

public static Source<EventData, NotUsed> SerializeWith(
this Source<SelectedSnapshot, NotUsed> source,
IMessageAdapter adapter,
int parallelism = 1)
{
return source
.SerializeWith(
msg => adapter.Adapt(msg.Metadata, msg.Snapshot),
parallelism);
}

public static Source<TResult, TMat> DeSerializeWith<TResult, TMat>(
this Source<ResolvedEvent, TMat> source,
Func<ResolvedEvent, Task<TResult?>> deserializer,
Expand Down
14 changes: 0 additions & 14 deletions src/Akka.Persistence.EventStore/Streams/EventStoreWrite.cs

This file was deleted.

Loading

0 comments on commit 164d98f

Please sign in to comment.