Skip to content

Commit

Permalink
Handle projection catchup between each query (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasJakobsson authored May 9, 2024
1 parent ba6a21a commit ae93a37
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 215 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2024 Akka.NET Project</Copyright>
<NoWarn>$(NoWarn);CS1591;NU1701;CA1707;</NoWarn>
<VersionPrefix>1.5.18</VersionPrefix>
<VersionPrefix>1.5.20</VersionPrefix>
<Authors>Akka.NET Team</Authors>
<PackageProjectUrl>https://github.com/akkadotnet/Akka.Persistence.EventStore</PackageProjectUrl>
<PackageReleaseNotes>Serialization changes to match 1.4 API
Expand Down
6 changes: 3 additions & 3 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.5.19</AkkaVersion>
<AkkaVersion>1.5.20</AkkaVersion>
<EventStoreVersion>23.2.1</EventStoreVersion>
<XunitVersion>2.7.1</XunitVersion>
<XunitVersion>2.8.0</XunitVersion>
<TestSdkVersion>17.9.0</TestSdkVersion>
</PropertyGroup>
<!-- Akka.NET Package Versions -->
Expand All @@ -26,7 +26,7 @@
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageVersion Include="xunit" Version="$(XunitVersion)" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.8">
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,11 @@ This will run the journal with its default settings. The default settings can be
#### HOCON Configuration

- `write-plugin` - Absolute path to the write journal plugin configuration entry that this query journal will connect to. If undefined (or "") it will connect to the default journal as specified by the `akka.persistence.journal.plugin` property.
- `refresh-interval` - The amount of time the plugin will wait between queries when it didn't find any events.

#### HOCON Configuration Example
```
akka.persistence.query.journal.eventstore {
write-plugin = ""
refresh-interval = 5s
}
```

Expand Down
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### 1.5.19 April 17 2024 ####
#### 1.5.20 May 9 2024 ####
This is a patch release with some minor refactoring and updates to dependencies.

#### 1.5.18 April 8 2024 ####
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using Akka.Configuration;
using Akka.Persistence.EventStore.Configuration;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;

namespace Akka.Persistence.EventStore.Hosting.Tests;
Expand Down Expand Up @@ -47,7 +46,6 @@ public void ModifiedOptionsTest()
{
AutoInitialize = false,
ConnectionString = "a",
QueryRefreshInterval = 5.Seconds(),
Serializer = "hyperion",
Adapter = "custom",
StreamPrefix = "prefix",
Expand All @@ -64,9 +62,7 @@ public void ModifiedOptionsTest()
.WithFallback(EventStorePersistence.DefaultJournalConfiguration);

var config = new EventStoreJournalSettings(journalConfig);

fullConfig.GetTimeSpan("akka.persistence.query.journal.custom.refresh-interval").Should().Be(5.Seconds());


config.ConnectionString.Should().Be("a");
config.Adapter.Should().Be("custom");
config.StreamPrefix.Should().Be("prefix");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public EventStoreJournalOptions() : this(true)
public string? TaggedStreamNamePattern { get; init; }
public string? PersistenceIdsStreamName { get; init; }
public string? PersistedEventsStreamName { get; init; }
public TimeSpan? QueryRefreshInterval { get; init; }
public TimeSpan? QueryProjectionCatchupTimeout { get; init; }
public TimeSpan? QueryNoStreamTimeout { get; init; }
public string? Tenant { get; init; }
public string? MaterializerDispatcher { get; init; }
public override string Identifier { get; set; } = identifier;
Expand Down Expand Up @@ -66,11 +65,8 @@ protected override StringBuilder Build(StringBuilder sb)

sb.AppendLine($"write-plugin = {PluginId.ToHocon()}");

if (QueryRefreshInterval != null)
sb.AppendLine($"refresh-interval = {QueryRefreshInterval.ToHocon()}");

if (QueryProjectionCatchupTimeout != null)
sb.AppendLine($"projection-catchup-timeout = {QueryProjectionCatchupTimeout.ToHocon()}");
if (QueryNoStreamTimeout != null)
sb.AppendLine($"no-stream-timeout = {QueryNoStreamTimeout.ToHocon()}");

sb.AppendLine("}");

Expand Down
6 changes: 2 additions & 4 deletions src/Akka.Persistence.EventStore.Hosting/HostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
string? persistedEventsStreamName = null,
string? tenantStreamNamePattern = null,
string? materializerDispatcher = null,
TimeSpan? queryRefreshInterval = null,
TimeSpan? queryProjectionCatchupTimeout = null)
TimeSpan? queryNoStreamTimeout = null)
{
if (mode == PersistenceMode.SnapshotStore && journalBuilder is not null)
throw new Exception($"{nameof(journalBuilder)} can only be set when {nameof(mode)} is set to either {PersistenceMode.Both} or {PersistenceMode.Journal}");
Expand All @@ -44,8 +43,7 @@ public static AkkaConfigurationBuilder WithEventStorePersistence(
PersistenceIdsStreamName = persistenceIdsStreamName,
Tenant = tenant,
MaterializerDispatcher = materializerDispatcher,
QueryRefreshInterval = queryRefreshInterval,
QueryProjectionCatchupTimeout = queryProjectionCatchupTimeout
QueryNoStreamTimeout = queryNoStreamTimeout
};

var adapters = new AkkaPersistenceJournalBuilder(journalOptions.Identifier, builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class = ""Akka.Persistence.EventStore.Snapshot.EventStoreSnapshotStore, Akka.Per
akka.persistence.query.journal.eventstore {{
class = ""Akka.Persistence.EventStore.Query.EventStoreReadJournalProvider, Akka.Persistence.EventStore""
write-plugin = ""akka.persistence.journal.eventstore""
refresh-interval = 1s
projection-catchup-timeout = 1s
no-stream-timeout = 200ms
}}
akka.test.single-expect-default = 10s");

Expand Down
40 changes: 18 additions & 22 deletions src/Akka.Persistence.EventStore.Tests/PersistentSubscriptionSpec.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Persistence.EventStore.Streams;
using Akka.Streams;
using Akka.Streams.Dsl;
Expand Down Expand Up @@ -30,7 +29,7 @@ public async Task ReadJournal_PersistentSubscription_should_see_existing_events(
{
const string streamName = "a";

var (cancelable, probe) = await Setup(streamName, 2);
var probe = await Setup(streamName, 2);

probe.Request(5);

Expand All @@ -44,15 +43,15 @@ public async Task ReadJournal_PersistentSubscription_should_see_existing_events(

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

cancelable.Cancel();
probe.Cancel();
}

[Fact]
public async Task ReadJournal_PersistentSubscription_should_see_new_events()
{
const string streamName = "b";

var (cancelable, probe) = await Setup(streamName, 1);
var probe = await Setup(streamName, 1);

probe.Request(5);

Expand All @@ -77,15 +76,15 @@ await _eventStoreClient.AppendToStreamAsync(

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

cancelable.Cancel();
probe.Cancel();
}

[Fact]
public async Task ReadJournal_PersistentSubscription_should_see_all_150_events()
{
const string streamName = "c";

var (cancelable, probe) = await Setup(streamName, 150);
var probe = await Setup(streamName, 150);

probe.Request(150);

Expand All @@ -100,19 +99,18 @@ public async Task ReadJournal_PersistentSubscription_should_see_all_150_events()

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

cancelable.Cancel();
probe.Cancel();
}

[Fact]
public async Task ReadJournal_PersistentSubscription_should_survive_dropped_connection_when_given_retry_settings()
{
const string streamName = "d";

var (cancelable, probe) = await Setup(
var probe = await Setup(
streamName,
1,
RestartSettings
.Create(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), 0.2));
keepReconnecting: true);

probe.Request(5);

Expand Down Expand Up @@ -141,15 +139,15 @@ await _eventStoreClient.AppendToStreamAsync(

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));

cancelable.Cancel();
probe.Cancel();
}

[Fact]
public async Task ReadJournal_PersistentSubscription_should_fail_on_dropped_connection_when_not_given_any_retry_settings()
{
const string streamName = "e";

var (_, probe) = await Setup(streamName, 1);
var probe = await Setup(streamName, 1);

probe.Request(5);

Expand All @@ -169,35 +167,33 @@ public async Task ReadJournal_PersistentSubscription_subscription_should_be_drop
{
const string streamName = "f";

var (cancelable, probe) = await Setup(streamName, 1);
var probe = await Setup(streamName, 1);

probe.Request(5);

var firstMessage = await probe.ExpectNextAsync<PersistentSubscriptionEvent>(x => x.Event.Event.EventType == $"{streamName}-1");

await firstMessage.Ack();

probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(300));

var subscriptionBeforeCancel = await _subscriptionClient.GetInfoToStreamAsync(streamName, streamName);

subscriptionBeforeCancel.Connections.Should().HaveCount(1);

cancelable.Cancel();
probe.Cancel();

await probe.ExpectCompleteAsync();

await Task.Delay(TimeSpan.FromMilliseconds(500));
await Task.Delay(TimeSpan.FromMilliseconds(300));

var subscriptionAfterCancel = await _subscriptionClient.GetInfoToStreamAsync(streamName, streamName);

subscriptionAfterCancel.Connections.Should().HaveCount(0);
}

private async Task<(ICancelable, TestSubscriber.Probe<PersistentSubscriptionEvent>)> Setup(
private async Task<TestSubscriber.Probe<PersistentSubscriptionEvent>> Setup(
string streamName,
int numberOfEvents,
RestartSettings? restartWith = null)
bool keepReconnecting = false)
{
await _subscriptionClient.CreateToStreamAsync(
streamName,
Expand All @@ -221,8 +217,8 @@ await _eventStoreClient.AppendToStreamAsync(
_subscriptionClient,
streamName,
streamName,
restartWith: restartWith);
keepReconnecting: keepReconnecting);

return stream.ToMaterialized(this.SinkProbe<PersistentSubscriptionEvent>(), Keep.Both).Run(Sys.Materializer());
return stream.ToMaterialized(this.SinkProbe<PersistentSubscriptionEvent>(), Keep.Right).Run(Sys.Materializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
using Akka.Persistence.TCK.Query;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Akka.Persistence.EventStore.Tests.Query;

Expand All @@ -19,6 +21,34 @@ public EventStoreCurrentEventsByTagSpec(DatabaseFixture databaseFixture, ITestOu
ReadJournal = Sys.ReadJournalFor<EventStoreReadJournal>(EventStorePersistence.QueryConfigPath);
}

[Fact]
public override void ReadJournal_query_CurrentEventsByTag_should_see_all_150_events()
{
if (ReadJournal is not ICurrentEventsByTagQuery queries)
throw IsTypeException.ForMismatchedType(nameof(ICurrentEventsByTagQuery), ReadJournal?.GetType().Name ?? "null");

var a = Sys.ActorOf(Query.TestActor.Props("a"));

foreach (var _ in Enumerable.Range(1, 150))
{
a.Tell("a green apple");
ExpectMsg("a green apple-done");
}

Thread.Sleep(TimeSpan.FromMilliseconds(300));

var greenSrc = queries.CurrentEventsByTag("green", offset: Offset.NoOffset());
var probe = greenSrc.RunWith(this.SinkProbe<EventEnvelope>(), Materializer);
probe.Request(150);
foreach (var i in Enumerable.Range(1, 150))
{
ExpectEnvelope(probe, "a", i, "a green apple", "green");
}

probe.ExpectComplete();
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
}

[Fact]
public async Task ReadJournal_query_offset_exclusivity_should_be_correct()
{
Expand All @@ -44,10 +74,31 @@ public async Task ReadJournal_query_offset_exclusivity_should_be_correct()

actor.Tell("a green banana");
ExpectMsg("a green banana-done");

await Task.Delay(TimeSpan.FromMilliseconds(300));

var round3 = await journal.CurrentEventsByTag(tag, item1Offset)
.RunWith(Sink.Seq<EventEnvelope>(), Sys.Materializer());

round3.Should().HaveCount(1);
}

private void ExpectEnvelope(
TestSubscriber.Probe<EventEnvelope> probe,
string persistenceId,
long sequenceNr,
string @event,
string tag)
{
var envelope = probe.ExpectNext<EventEnvelope>(_ => true);
envelope.PersistenceId.Should().Be(persistenceId);
envelope.SequenceNr.Should().Be(sequenceNr);
envelope.Event.Should().Be(@event);

if (SupportsTagsInEventEnvelope)
{
envelope.Tags.Should().NotBeNull();
envelope.Tags.Should().Contain(tag);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ public EventStoreReadJournalSettings(Config config)
config = config.WithFallback(EventStorePersistence.DefaultQueryConfiguration);

WritePlugin = config.GetString("write-plugin");
QueryRefreshInterval = config.GetTimeSpan("refresh-interval", TimeSpan.FromSeconds(5));
ProjectionCatchupTimeout = config.GetTimeSpan("projection-catchup-timeout", TimeSpan.FromMilliseconds(500));
NoStreamTimeout = config.GetTimeSpan("no-stream-timeout", TimeSpan.FromMilliseconds(500));
}

public string WritePlugin { get; }
public TimeSpan QueryRefreshInterval { get; }
public TimeSpan ProjectionCatchupTimeout { get; }
public TimeSpan NoStreamTimeout { get; }
}
Loading

0 comments on commit ae93a37

Please sign in to comment.