Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump Akka.Persistence.TCK, Akka, Akka.Persistence.Query and Akka.Persistence #26

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
<AkkaVersion>1.4.0-beta1</AkkaVersion>
<AkkaVersion>1.5.18</AkkaVersion>
<NBenchVersion>1.2.2</NBenchVersion>
<XunitVersion>2.4.1</XunitVersion>
<TestSdkVersion>15.9.0</TestSdkVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,51 @@ public AllPersistenceIdsPublisher(bool isLive, string writeJournalPluginId)

protected override bool Receive(object message)
{
return message
.Match()
.With<Request>(_ =>
{
_journalRef.Tell(SubscribeAllPersistenceIds.Instance);
Become(Active);
})
.With<Cancel>(_ => { Context.Stop(Self); })
.WasHandled;
switch (message)
{
case Request:
_journalRef.Tell(SubscribeAllPersistenceIds.Instance);
Become(Active);
return true;
case Cancel:
Context.Stop(Self);
return true;
default:
return false;
}
}

private bool Active(object message)
{
return message
.Match()
.With<CaughtUp>(_ =>
{
_isCaughtUp = true;
_buffer.DeliverBuffer(TotalDemand);
if (!_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<SubscriptionDroppedException>(OnErrorThenStop)
.With<IPersistentRepresentation>(@event =>
{
_buffer.Add(@event.PersistenceId);
_buffer.DeliverBuffer(TotalDemand);
switch (message)
{
case CaughtUp:
_isCaughtUp = true;
_buffer.DeliverBuffer(TotalDemand);
if (!_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
return true;
case SubscriptionDroppedException ex:
OnErrorThenStop(ex);
return true;
case IPersistentRepresentation @event:
_buffer.Add(@event.PersistenceId);
_buffer.DeliverBuffer(TotalDemand);

if (_isCaughtUp && !_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<Request>(_ =>
{
_buffer.DeliverBuffer(TotalDemand);
if (_isCaughtUp && !_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
})
.With<Cancel>(_ => Context.Stop(Self))
.WasHandled;
if (_isCaughtUp && !_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
return true;
case Request:
_buffer.DeliverBuffer(TotalDemand);
if (_isCaughtUp && !_isLive && _buffer.IsEmpty)
OnCompleteThenStop();
return true;
case Cancel:
Context.Stop(Self);
return true;
default:
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,27 @@ public static Props Props(string persistenceId, long fromSequenceNr, long toSequ

protected override bool Receive(object message)
{
return message.Match()
.With<SubscriptionDroppedException>(OnSubscriptionDropped)
.With<CaughtUp>(_ =>
{
_isCaughtUp = true;
MaybeReply();
})
.With<ReplayedMessage>(OnReplayedMessage)
.With<Request>(OnRequest)
.With<Cancel>(OnCancel)
.WasHandled;
switch (message)
{
case SubscriptionDroppedException ex:
OnSubscriptionDropped(ex);
return true;
case CaughtUp:
_isCaughtUp = true;
MaybeReply();
return true;
case ReplayedMessage msg:
OnReplayedMessage(msg);
return true;
case Request req:
OnRequest(req);
return true;
case Cancel c:
OnCancel(c);
return true;
default:
return false;
}
}

private void OnSubscriptionDropped(SubscriptionDroppedException cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,27 @@ public static Props Props(string tag, bool isLive, long? fromOffset, long toOffs

protected override bool Receive(object message)
{
return message.Match()
.With<SubscriptionDroppedException>(OnSubscriptionDropped)
.With<CaughtUp>(_ =>
{
_isCaughtUp = true;
MaybeReply();
})
.With<ReplayedTaggedMessage>(OnReplayedMessage)
.With<Request>(OnRequest)
.With<Cancel>(OnCancel)
.WasHandled;
switch (message)
{
case SubscriptionDroppedException ex:
OnSubscriptionDropped(ex);
return true;
case CaughtUp:
_isCaughtUp = true;
MaybeReply();
return true;
case ReplayedTaggedMessage msg:
OnReplayedMessage(msg);
return true;
case Request req:
OnRequest(req);
return true;
case Cancel c:
OnCancel(c);
return true;
default:
return false;
}
}

private void OnSubscriptionDropped(SubscriptionDroppedException cause)
Expand Down
51 changes: 32 additions & 19 deletions src/Akka.Persistence.EventStore/Journal/EventStoreJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,21 @@ protected override void PostStop()

private bool AwaitingConnection(object message)
{
return message.Match()
.With<Status.Success>(success =>
{
UnbecomeStacked();
Stash.UnstashAll();
})
.With<Status.Failure>(fail =>
{
_log.Error(fail.Cause, "Failure during {0} initialization.", Self);
Context.Stop(Self);
})
.Default(_ => Stash.Stash())
.WasHandled;
switch (message)
{
case Status.Success:
UnbecomeStacked();
Stash.UnstashAll();
break;
case Status.Failure fail:
_log.Error(fail.Cause, "Failure during {0} initialization.", Self);
Context.Stop(Self);
break;
default:
Stash.Stash();
break;
}
return true;
}

private IEventAdapter BuildDefaultJournalAdapter()
Expand Down Expand Up @@ -289,12 +291,23 @@ await _conn.SetStreamMetadataAsync(persistenceId, ExpectedVersion.Any,

protected override bool ReceivePluginInternal(object message)
{
return message.Match()
.With<ReplayTaggedMessages>(StartTaggedSubscription)
.With<SubscribePersistenceId>(StartPersistenceIdSubscription)
.With<SubscribeAllPersistenceIds>(SubscribeAllPersistenceIdsHandler)
.With<Unsubscribe>(RemoveSubscriber)
.WasHandled;
switch (message)
{
case ReplayTaggedMessages msg:
StartTaggedSubscription(msg);
return true;
case SubscribePersistenceId msg:
StartPersistenceIdSubscription(msg);
return true;
case SubscribeAllPersistenceIds msg:
SubscribeAllPersistenceIdsHandler(msg);
return true;
case Unsubscribe msg:
RemoveSubscriber(msg);
return true;
default:
return false;
}
}

private void StartPersistenceIdSubscription(SubscribePersistenceId sub)
Expand Down
Loading