Skip to content

Commit

Permalink
Refactored effect store
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Dec 14, 2024
1 parent 4c44ce9 commit e9d820a
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ protected async Task SunshineScenarioTest(Task<IEffectsStore> storeTask)
var functionId = TestStoredId.Create();
var storedEffect1 = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
StoredException: null
);
var storedEffect2 = new StoredEffect(
"EffectId2",
"EffectId2".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: null,
Expand Down Expand Up @@ -65,6 +67,7 @@ protected async Task SingleEffectWithResultLifeCycle(Task<IEffectsStore> storeTa
var functionId = TestStoredId.Create();
var effect = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
Expand Down Expand Up @@ -103,6 +106,7 @@ protected async Task SingleFailingEffectLifeCycle(Task<IEffectsStore> storeTask)
);
var storedEffect = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
Expand Down Expand Up @@ -130,13 +134,15 @@ protected async Task EffectCanBeDeleted(Task<IEffectsStore> storeTask)
var functionId = TestStoredId.Create();
var storedEffect1 = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
StoredException: null
);
var storedEffect2 = new StoredEffect(
"EffectId2",
"EffectId2".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: null,
Expand All @@ -150,17 +156,17 @@ await store
.SelectAsync(sas => sas.Count() == 2)
.ShouldBeTrueAsync();

await store.DeleteEffectResult(functionId, storedEffect2.EffectId, isState: false);
await store.DeleteEffectResult(functionId, storedEffect2.EffectId.ToStoredEffectId(), isState: false);
var storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
storedEffects[0].EffectId.ShouldBe(storedEffect1.EffectId);

await store.DeleteEffectResult(functionId, storedEffect2.EffectId, isState: false);
await store.DeleteEffectResult(functionId, storedEffect2.EffectId.ToStoredEffectId(), isState: false);
storedEffects = await store.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
storedEffects[0].EffectId.ShouldBe(storedEffect1.EffectId);

await store.DeleteEffectResult(functionId, storedEffect1.EffectId, isState: false);
await store.DeleteEffectResult(functionId, storedEffect1.EffectId.ToStoredEffectId(), isState: false);
await store
.GetEffectResults(functionId)
.SelectAsync(sas => sas.Any())
Expand All @@ -176,13 +182,15 @@ protected async Task DeleteFunctionIdDeletesAllRelatedEffects(Task<IEffectsStore

var storedEffect1 = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
StoredException: null
);
var storedEffect2 = new StoredEffect(
"EffectId2",
"EffectId2".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: null,
Expand Down Expand Up @@ -220,13 +228,15 @@ protected async Task TruncateDeletesAllEffects(Task<IEffectsStore> storeTask)

var storedEffect1 = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: null,
StoredException: null
);
var storedEffect2 = new StoredEffect(
"EffectId2",
"EffectId2".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: null,
Expand All @@ -249,6 +259,4 @@ await store
.SelectAsync(e => e.Any())
.ShouldBeFalseAsync();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Reactive.Extensions;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
using Cleipnir.ResilientFunctions.Tests.Utils;
using Shouldly;

Expand Down Expand Up @@ -1285,6 +1284,7 @@ await store.EffectsStore.SetEffectResult(
rAction.MapToStoredId(functionId),
new StoredEffect(
new EffectId("SomeId"),
"SomeId".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: "SomeResult".ToJson().ToUtf8Bytes(),
Expand Down Expand Up @@ -1322,6 +1322,7 @@ await store.EffectsStore.SetEffectResult(
rAction.MapToStoredId(functionId),
new StoredEffect(
new EffectId("SomeId"),
"SomeId".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: "SomeResult".ToJson().ToUtf8Bytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ await statesStore.SetEffectResult(
state2.IsState.ShouldBeTrue();
state2.Result.ShouldBe("SomeJson#2".ToUtf8Bytes());

await statesStore.DeleteEffectResult(flowId, state1.EffectId, isState: true);
await statesStore.DeleteEffectResult(flowId, state1.EffectId.ToStoredEffectId(), isState: true);

states = await statesStore.GetEffectResults(flowId);
states.Count.ShouldBe(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ await store.EffectsStore.SetEffectResult(
await store.CorrelationStore.SetCorrelation(functionId, "SomeCorrelationId");
await store.EffectsStore.SetEffectResult(
functionId,
new StoredEffect("SomeEffectId", IsState: false, WorkStatus.Completed, Result: null, StoredException: null)
new StoredEffect("SomeEffectId", "SomeEffectId".ToStoredEffectId(),IsState: false, WorkStatus.Completed, Result: null, StoredException: null)
);
await store.MessageStore.AppendMessage(functionId, new StoredMessage("SomeJson".ToUtf8Bytes(), "SomeType".ToUtf8Bytes()));
await store.TimeoutStore.UpsertTimeout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ await store.CreateFunction(
parent: null
).ShouldBeTrueAsync();

await effectsStore.SetEffectResult(functionId, new StoredEffect(EffectId: "", IsState: true, WorkStatus.Completed, "some default state".ToUtf8Bytes(), StoredException: null));
await effectsStore.SetEffectResult(functionId, new StoredEffect(EffectId: "", "".ToStoredEffectId(), IsState: true, WorkStatus.Completed, "some default state".ToUtf8Bytes(), StoredException: null));

var storedEffects = await effectsStore.GetEffectResults(functionId);
storedEffects.Count.ShouldBe(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Storage;

namespace Cleipnir.ResilientFunctions.Tests.TestTemplates.WatchDogsTests;
Expand Down Expand Up @@ -29,14 +28,19 @@ public Task Truncate()
public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.SetEffectResult(storedId, storedEffect);
: _inner.SetEffectResult(storedId, storedEffect);

public Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.SetEffectResults(storedId, storedEffects);

public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
=> _crashed
? Task.FromException<IReadOnlyList<StoredEffect>>(new TimeoutException())
: _inner.GetEffectResults(storedId);

public Task DeleteEffectResult(StoredId storedId, EffectId effectId, bool isState)
public Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.DeleteEffectResult(storedId, effectId, isState);
Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/Effect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public async Task Clear(string id)
if (!effectResults.ContainsKey(id))
return;

await effectsStore.DeleteEffectResult(storedId, id, isState: false);
await effectsStore.DeleteEffectResult(storedId, id.ToStoredEffectId(), isState: false);
lock (_sync)
effectResults.Remove(id);
}
Expand Down
10 changes: 5 additions & 5 deletions Core/Cleipnir.ResilientFunctions/Domain/ExistingEffects.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task<WorkStatus> GetStatus(EffectId effectId)
public async Task Remove(string effectId)
{
var storedEffects = await GetStoredEffects();
await effectsStore.DeleteEffectResult(storedId, effectId, isState: false);
await effectsStore.DeleteEffectResult(storedId, effectId.ToStoredEffectId(), isState: false);
storedEffects.Remove(effectId);
}

Expand All @@ -61,14 +61,14 @@ private async Task Set(StoredEffect storedEffect)
public Task SetValue<TValue>(string effectId, TValue value) => SetSucceeded(effectId, value);

public Task SetStarted(string effectId)
=> Set(new StoredEffect(effectId, IsState: false, WorkStatus.Started, Result: null, StoredException: null));
=> Set(new StoredEffect(effectId, StoredEffectId.Create(effectId), IsState: false, WorkStatus.Started, Result: null, StoredException: null));

public Task SetSucceeded(string effectId)
=> Set(new StoredEffect(effectId, IsState: false, WorkStatus.Completed, Result: null, StoredException: null));
=> Set(new StoredEffect(effectId, StoredEffectId.Create(effectId), IsState: false, WorkStatus.Completed, Result: null, StoredException: null));

public Task SetSucceeded<TResult>(string effectId, TResult result)
=> Set(new StoredEffect(effectId, IsState: false, WorkStatus.Completed, Result: serializer.SerializeEffectResult(result), StoredException: null));
=> Set(new StoredEffect(effectId,StoredEffectId.Create(effectId), IsState: false, WorkStatus.Completed, Result: serializer.SerializeEffectResult(result), StoredException: null));

public Task SetFailed(string effectId, Exception exception)
=> Set(new StoredEffect(effectId, IsState: false, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(exception)));
=> Set(new StoredEffect(effectId, StoredEffectId.Create(effectId), IsState: false, WorkStatus.Failed, Result: null, StoredException: serializer.SerializeException(exception)));
}
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/ExistingStates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private async Task<Dictionary<StateId, StoredState>> GetStoredStates()
public async Task Remove(string stateId)
{
var storedStates = await GetStoredStates();
await _effectsStore.DeleteEffectResult(_storedId, stateId, isState: true);
await _effectsStore.DeleteEffectResult(_storedId, stateId.ToStoredEffectId(), isState: true);
storedStates.Remove(stateId);
}

Expand Down
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Domain/States.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private async Task RemoveInner(string id)
if (!existingStoredStates.ContainsKey(id))
return;

await _effectStore.DeleteEffectResult(_storedId, id, isState: true);
await _effectStore.DeleteEffectResult(_storedId, id.ToStoredEffectId(), isState: true);

lock (_sync)
{
Expand Down
4 changes: 2 additions & 2 deletions Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;

namespace Cleipnir.ResilientFunctions.Storage;

Expand All @@ -9,7 +8,8 @@ public interface IEffectsStore
Task Initialize();
Task Truncate();
Task SetEffectResult(StoredId storedId, StoredEffect storedEffect);
Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects);
Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId);
Task DeleteEffectResult(StoredId storedId, EffectId effectId, bool isState);
Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState);
Task Remove(StoredId storedId);
}
15 changes: 10 additions & 5 deletions Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;

namespace Cleipnir.ResilientFunctions.Storage;

public class InMemoryEffectsStore : IEffectsStore
{
private record EffectKey(EffectId EffectId, bool IsState);
private record EffectKey(StoredEffectId EffectId, bool IsState);
private readonly Dictionary<StoredId, Dictionary<EffectKey, StoredEffect>> _effects = new();
private readonly object _sync = new();

Expand All @@ -26,7 +25,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
{
lock (_sync)
{
var key = new EffectKey(storedEffect.EffectId, storedEffect.IsState);
var key = new EffectKey(storedEffect.StoredEffectId, storedEffect.IsState);
if (!_effects.ContainsKey(storedId))
_effects[storedId] = new Dictionary<EffectKey, StoredEffect>();

Expand All @@ -36,6 +35,12 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
return Task.CompletedTask;
}

public async Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
{
foreach (var storedEffect in storedEffects)
await SetEffectResult(storedId, storedEffect);
}

public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
{
lock (_sync)
Expand All @@ -44,7 +49,7 @@ public Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId)
: ((IReadOnlyList<StoredEffect>) _effects[storedId].Values.ToList()).ToTask();
}

public Task DeleteEffectResult(StoredId storedId, EffectId effectId, bool isState)
public Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState)
{
var key = new EffectKey(effectId, isState);
lock (_sync)
Expand All @@ -53,7 +58,7 @@ public Task DeleteEffectResult(StoredId storedId, EffectId effectId, bool isStat

return Task.CompletedTask;
}

public Task Remove(StoredId storedId)
{
lock (_sync)
Expand Down
25 changes: 18 additions & 7 deletions Core/Cleipnir.ResilientFunctions/Storage/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,38 @@ public record IdAndEpoch(StoredId FlowId, int Epoch);
public record StoredException(string ExceptionMessage, string? ExceptionStackTrace, string ExceptionType);
public record StatusAndEpoch(Status Status, int Epoch);

public record StoredEffectId(Guid Value)
{
public static StoredEffectId Create(string instanceId)
=> new(InstanceIdFactory.FromString(instanceId));
}

public static class StoredEffectIdExtensions
{
public static StoredEffectId ToStoredEffectId(this string effectId) => StoredEffectId.Create(effectId);
public static StoredEffectId ToStoredEffectId(this EffectId effectId) => StoredEffectId.Create(effectId.Value);
}

public record StoredEffect(
EffectId EffectId,
StoredEffectId StoredEffectId,
bool IsState,
WorkStatus WorkStatus,
byte[]? Result,
StoredException? StoredException
)
{
public static StoredEffect CreateCompleted(EffectId effectId, byte[] result)
=> new(effectId, IsState: false, WorkStatus.Completed, result, StoredException: null);
=> new(effectId, StoredEffectId.Create(effectId.Value), IsState: false, WorkStatus.Completed, result, StoredException: null);
public static StoredEffect CreateCompleted(EffectId effectId)
=> new(effectId, IsState: false, WorkStatus.Completed, Result: null, StoredException: null);
=> new(effectId, StoredEffectId.Create(effectId.Value), IsState: false, WorkStatus.Completed, Result: null, StoredException: null);
public static StoredEffect CreateStarted(EffectId effectId)
=> new(effectId, IsState: false, WorkStatus.Started, Result: null, StoredException: null);
=> new(effectId, StoredEffectId.Create(effectId.Value), IsState: false, WorkStatus.Started, Result: null, StoredException: null);
public static StoredEffect CreateFailed(EffectId effectId, StoredException storedException)
=> new(effectId, IsState: false, WorkStatus.Failed, Result: null, storedException);
=> new(effectId, StoredEffectId.Create(effectId.Value), IsState: false, WorkStatus.Failed, Result: null, storedException);
public static StoredEffect CreateState(StoredState storedState)
=> new(storedState.StateId.Value, IsState: true, WorkStatus.Completed, storedState.StateJson, StoredException: null);
=> new(storedState.StateId.Value, StoredEffectId.Create(storedState.StateId.Value), IsState: true, WorkStatus.Completed, storedState.StateJson, StoredException: null);
};
public record StoredState(StateId StateId, byte[] StateJson);

public record StoredSemaphore(string Group, string Instance);

public record IdWithParam(StoredId StoredId, string HumanInstanceId, byte[]? Param);
10 changes: 0 additions & 10 deletions Core/Cleipnir.ResilientFunctions/Storage/Utils/Escaper.cs

This file was deleted.

Loading

0 comments on commit e9d820a

Please sign in to comment.