From ed644dc2d3a7dbe8ab1a0d99ead100ec9cffec5c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 1 Apr 2024 14:46:42 +0000 Subject: [PATCH 1/2] Bump Akka.Persistence.TCK, Akka, Akka.Persistence.Query and Akka.Persistence Bumps [Akka.Persistence.TCK](https://github.com/akkadotnet/akka.net), [Akka](https://github.com/akkadotnet/akka.net), [Akka.Persistence.Query](https://github.com/akkadotnet/akka.net) and [Akka.Persistence](https://github.com/akkadotnet/akka.net). These dependencies needed to be updated together. Updates `Akka.Persistence.TCK` from 1.4.0-beta1 to 1.5.18 - [Release notes](https://github.com/akkadotnet/akka.net/releases) - [Changelog](https://github.com/akkadotnet/akka.net/blob/dev/RELEASE_NOTES.md) - [Commits](https://github.com/akkadotnet/akka.net/compare/v1.4.0-beta1...1.5.18) Updates `Akka` from 1.4.0-beta1 to 1.5.18 - [Release notes](https://github.com/akkadotnet/akka.net/releases) - [Changelog](https://github.com/akkadotnet/akka.net/blob/dev/RELEASE_NOTES.md) - [Commits](https://github.com/akkadotnet/akka.net/compare/v1.4.0-beta1...1.5.18) Updates `Akka.Persistence.Query` from 1.4.0-beta1 to 1.5.18 - [Release notes](https://github.com/akkadotnet/akka.net/releases) - [Changelog](https://github.com/akkadotnet/akka.net/blob/dev/RELEASE_NOTES.md) - [Commits](https://github.com/akkadotnet/akka.net/compare/v1.4.0-beta1...1.5.18) Updates `Akka.Persistence` from 1.4.0-beta1 to 1.5.18 - [Release notes](https://github.com/akkadotnet/akka.net/releases) - [Changelog](https://github.com/akkadotnet/akka.net/blob/dev/RELEASE_NOTES.md) - [Commits](https://github.com/akkadotnet/akka.net/compare/v1.4.0-beta1...1.5.18) --- updated-dependencies: - dependency-name: Akka.Persistence.TCK dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: Akka dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: Akka.Persistence.Query dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: Akka.Persistence dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- Directory.Packages.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Directory.Packages.props b/Directory.Packages.props index 828a027..2b1b5fa 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,7 +1,7 @@ true - 1.4.0-beta1 + 1.5.18 1.2.2 2.4.1 15.9.0 From 3be91603ab0663909328d9415ec12e88405b87bb Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Tue, 2 Apr 2024 03:29:57 +0700 Subject: [PATCH 2/2] Replace deprecated `Match()` API with switch statements --- .../Publishers/AllPersistenceIdsPublisher.cs | 74 ++++++++++--------- .../EventsByPersistenceIdPublisher.cs | 32 +++++--- .../Publishers/EventsByTagPublisher.cs | 32 +++++--- .../Journal/EventStoreJournal.cs | 51 ++++++++----- 4 files changed, 114 insertions(+), 75 deletions(-) diff --git a/src/Akka.Persistence.EventStore.Query/Publishers/AllPersistenceIdsPublisher.cs b/src/Akka.Persistence.EventStore.Query/Publishers/AllPersistenceIdsPublisher.cs index 3c94612..79c032b 100644 --- a/src/Akka.Persistence.EventStore.Query/Publishers/AllPersistenceIdsPublisher.cs +++ b/src/Akka.Persistence.EventStore.Query/Publishers/AllPersistenceIdsPublisher.cs @@ -25,45 +25,51 @@ public AllPersistenceIdsPublisher(bool isLive, string writeJournalPluginId) protected override bool Receive(object message) { - return message - .Match() - .With(_ => - { - _journalRef.Tell(SubscribeAllPersistenceIds.Instance); - Become(Active); - }) - .With(_ => { Context.Stop(Self); }) - .WasHandled; + switch (message) + { + case Request: + _journalRef.Tell(SubscribeAllPersistenceIds.Instance); + Become(Active); + return true; + case Cancel: + Context.Stop(Self); + return true; + default: + return false; + } } private bool Active(object message) { - return message - .Match() - .With(_ => - { - _isCaughtUp = true; - _buffer.DeliverBuffer(TotalDemand); - if (!_isLive && _buffer.IsEmpty) - OnCompleteThenStop(); - }) - .With(OnErrorThenStop) - .With(@event => - { - _buffer.Add(@event.PersistenceId); - _buffer.DeliverBuffer(TotalDemand); + switch (message) + { + case CaughtUp: + _isCaughtUp = true; + _buffer.DeliverBuffer(TotalDemand); + if (!_isLive && _buffer.IsEmpty) + OnCompleteThenStop(); + return true; + case SubscriptionDroppedException ex: + OnErrorThenStop(ex); + return true; + case IPersistentRepresentation @event: + _buffer.Add(@event.PersistenceId); + _buffer.DeliverBuffer(TotalDemand); - if (_isCaughtUp && !_isLive && _buffer.IsEmpty) - OnCompleteThenStop(); - }) - .With(_ => - { - _buffer.DeliverBuffer(TotalDemand); - if (_isCaughtUp && !_isLive && _buffer.IsEmpty) - OnCompleteThenStop(); - }) - .With(_ => Context.Stop(Self)) - .WasHandled; + if (_isCaughtUp && !_isLive && _buffer.IsEmpty) + OnCompleteThenStop(); + return true; + case Request: + _buffer.DeliverBuffer(TotalDemand); + if (_isCaughtUp && !_isLive && _buffer.IsEmpty) + OnCompleteThenStop(); + return true; + case Cancel: + Context.Stop(Self); + return true; + default: + return false; + } } } } \ No newline at end of file diff --git a/src/Akka.Persistence.EventStore.Query/Publishers/EventsByPersistenceIdPublisher.cs b/src/Akka.Persistence.EventStore.Query/Publishers/EventsByPersistenceIdPublisher.cs index 90773df..3d18f28 100644 --- a/src/Akka.Persistence.EventStore.Query/Publishers/EventsByPersistenceIdPublisher.cs +++ b/src/Akka.Persistence.EventStore.Query/Publishers/EventsByPersistenceIdPublisher.cs @@ -43,17 +43,27 @@ public static Props Props(string persistenceId, long fromSequenceNr, long toSequ protected override bool Receive(object message) { - return message.Match() - .With(OnSubscriptionDropped) - .With(_ => - { - _isCaughtUp = true; - MaybeReply(); - }) - .With(OnReplayedMessage) - .With(OnRequest) - .With(OnCancel) - .WasHandled; + switch (message) + { + case SubscriptionDroppedException ex: + OnSubscriptionDropped(ex); + return true; + case CaughtUp: + _isCaughtUp = true; + MaybeReply(); + return true; + case ReplayedMessage msg: + OnReplayedMessage(msg); + return true; + case Request req: + OnRequest(req); + return true; + case Cancel c: + OnCancel(c); + return true; + default: + return false; + } } private void OnSubscriptionDropped(SubscriptionDroppedException cause) diff --git a/src/Akka.Persistence.EventStore.Query/Publishers/EventsByTagPublisher.cs b/src/Akka.Persistence.EventStore.Query/Publishers/EventsByTagPublisher.cs index 78fbeec..d9c2810 100644 --- a/src/Akka.Persistence.EventStore.Query/Publishers/EventsByTagPublisher.cs +++ b/src/Akka.Persistence.EventStore.Query/Publishers/EventsByTagPublisher.cs @@ -51,17 +51,27 @@ public static Props Props(string tag, bool isLive, long? fromOffset, long toOffs protected override bool Receive(object message) { - return message.Match() - .With(OnSubscriptionDropped) - .With(_ => - { - _isCaughtUp = true; - MaybeReply(); - }) - .With(OnReplayedMessage) - .With(OnRequest) - .With(OnCancel) - .WasHandled; + switch (message) + { + case SubscriptionDroppedException ex: + OnSubscriptionDropped(ex); + return true; + case CaughtUp: + _isCaughtUp = true; + MaybeReply(); + return true; + case ReplayedTaggedMessage msg: + OnReplayedMessage(msg); + return true; + case Request req: + OnRequest(req); + return true; + case Cancel c: + OnCancel(c); + return true; + default: + return false; + } } private void OnSubscriptionDropped(SubscriptionDroppedException cause) diff --git a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs index 0562ff5..71b7308 100644 --- a/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs +++ b/src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs @@ -66,19 +66,21 @@ protected override void PostStop() private bool AwaitingConnection(object message) { - return message.Match() - .With(success => - { - UnbecomeStacked(); - Stash.UnstashAll(); - }) - .With(fail => - { - _log.Error(fail.Cause, "Failure during {0} initialization.", Self); - Context.Stop(Self); - }) - .Default(_ => Stash.Stash()) - .WasHandled; + switch (message) + { + case Status.Success: + UnbecomeStacked(); + Stash.UnstashAll(); + break; + case Status.Failure fail: + _log.Error(fail.Cause, "Failure during {0} initialization.", Self); + Context.Stop(Self); + break; + default: + Stash.Stash(); + break; + } + return true; } private IEventAdapter BuildDefaultJournalAdapter() @@ -289,12 +291,23 @@ await _conn.SetStreamMetadataAsync(persistenceId, ExpectedVersion.Any, protected override bool ReceivePluginInternal(object message) { - return message.Match() - .With(StartTaggedSubscription) - .With(StartPersistenceIdSubscription) - .With(SubscribeAllPersistenceIdsHandler) - .With(RemoveSubscriber) - .WasHandled; + switch (message) + { + case ReplayTaggedMessages msg: + StartTaggedSubscription(msg); + return true; + case SubscribePersistenceId msg: + StartPersistenceIdSubscription(msg); + return true; + case SubscribeAllPersistenceIds msg: + SubscribeAllPersistenceIdsHandler(msg); + return true; + case Unsubscribe msg: + RemoveSubscriber(msg); + return true; + default: + return false; + } } private void StartPersistenceIdSubscription(SubscribePersistenceId sub)