Skip to content

Commit

Permalink
Implemented IEffectsStore SetEffectResults-method
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Dec 14, 2024
1 parent 9c11a10 commit a0a38cd
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects()
[TestMethod]
public override Task TruncateDeletesAllEffects()
=> TruncateDeletesAllEffects(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));

[TestMethod]
public override Task BulkInsertTest()
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,37 @@ await store
.SelectAsync(e => e.Any())
.ShouldBeFalseAsync();
}

public abstract Task BulkInsertTest();
protected async Task BulkInsertTest(Task<IEffectsStore> storeTask)
{
var store = await storeTask;
var storedId = TestStoredId.Create();
var storedEffect1 = new StoredEffect(
"EffectId1",
"EffectId1".ToStoredEffectId(),
IsState: false,
WorkStatus.Started,
Result: "some result 1".ToUtf8Bytes(),
StoredException: null
);
var storedEffect2 = new StoredEffect(
"EffectId2",
"EffectId2".ToStoredEffectId(),
IsState: false,
WorkStatus.Completed,
Result: "some result 2".ToUtf8Bytes(),
StoredException: null
);


await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]);

var effects = await store.GetEffectResults(storedId);
effects.Count.ShouldBe(2);
effects[0].EffectId.ShouldBe(storedEffect1.EffectId);
effects[0].Result.ShouldBe("some result 1".ToUtf8Bytes());
effects[1].EffectId.ShouldBe(storedEffect2.EffectId);
effects[1].Result.ShouldBe("some result 2".ToUtf8Bytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
? Task.FromException(new TimeoutException())
: _inner.SetEffectResult(storedId, storedEffect);

public Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
public Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
=> _crashed
? Task.FromException(new TimeoutException())
: _inner.SetEffectResults(storedId, storedEffects);
Expand Down
6 changes: 6 additions & 0 deletions Core/Cleipnir.ResilientFunctions/Helpers/Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@ public static IEnumerable<T> WithRandomOffset<T>(this IReadOnlyList<T> elms)
public static Guid ToGuid(this string s) => Guid.Parse(s);

public static string StringJoin(this IEnumerable<string> strings, string separator) => string.Join(separator, strings);

public static IEnumerable<string> Replicate(this string str, int count)
{
for (var i = 0; i < count; i++)
yield return str;
}
}
2 changes: 1 addition & 1 deletion Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public interface IEffectsStore
Task Initialize();
Task Truncate();
Task SetEffectResult(StoredId storedId, StoredEffect storedEffect);
Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects);
Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects);
Task<IReadOnlyList<StoredEffect>> GetEffectResults(StoredId storedId);
Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState);
Task Remove(StoredId storedId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect)
return Task.CompletedTask;
}

public async Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
{
foreach (var storedEffect in storedEffects)
await SetEffectResult(storedId, storedEffect);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Cleipnir.ResilientFunctions.Helpers;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.MariaDb.Tests;

Expand Down Expand Up @@ -28,4 +29,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects()
[TestMethod]
public override Task TruncateDeletesAllEffects()
=> TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore());

[TestMethod]
public override Task BulkInsertTest()
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Storage.Utils;
using MySqlConnector;
Expand Down Expand Up @@ -75,9 +76,37 @@ ON DUPLICATE KEY UPDATE
await command.ExecuteNonQueryAsync();
}

public Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
private string? _setEffectResultsSql;
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
{
throw new NotImplementedException();
await using var conn = await CreateConnection();
_setEffectResultsSql ??= $@"
INSERT INTO {_tablePrefix}_effects
(type, instance, id_hash, is_state, status, result, exception, effect_id)
VALUES
@VALUES
ON DUPLICATE KEY UPDATE
status = VALUES(status), result = VALUES(result), exception = VALUES(exception)";

var sql = _setEffectResultsSql.Replace(
"@VALUES",
"(?, ?, ?, ?, ?, ?, ?, ?)".Replicate(storedEffects.Count).StringJoin(", ")
);

await using var command = new MySqlCommand(sql, conn);
foreach (var storedEffect in storedEffects)
{
command.Parameters.Add(new MySqlParameter(name: null, storedId.Type.Value));
command.Parameters.Add(new MySqlParameter(name: null, storedId.Instance.Value.ToString("N")));
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.StoredEffectId.Value.ToString("N")));
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.IsState));
command.Parameters.Add(new MySqlParameter(name: null, (int) storedEffect.WorkStatus));
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.Result ?? (object) DBNull.Value));
command.Parameters.Add(new MySqlParameter(name: null, JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value));
command.Parameters.Add(new MySqlParameter(name: null, storedEffect.EffectId.Value));
}

await command.ExecuteNonQueryAsync();
}

private string? _getEffectResultsSql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects()
[TestMethod]
public override Task TruncateDeletesAllEffects()
=> TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore());

[TestMethod]
public override Task BulkInsertTest()
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,40 @@ ON CONFLICT (type, instance, id_hash, is_state)
await command.ExecuteNonQueryAsync();
}

public Task SetEffectResults(StoredId storedId, IEnumerable<StoredEffect> storedEffects)
public async Task SetEffectResults(StoredId storedId, IReadOnlyList<StoredEffect> storedEffects)
{
throw new NotImplementedException();
await using var conn = await CreateConnection();
_setEffectResultSql ??= $@"
INSERT INTO {tablePrefix}_effects
(type, instance, id_hash, is_state, status, result, exception, effect_id)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (type, instance, id_hash, is_state)
DO
UPDATE SET status = EXCLUDED.status, result = EXCLUDED.result, exception = EXCLUDED.exception";

await using var batch = new NpgsqlBatch(conn);
foreach (var storedEffect in storedEffects)
{
var command = new NpgsqlBatchCommand(_setEffectResultSql)
{
Parameters =
{
new() {Value = storedId.Type.Value},
new() {Value = storedId.Instance.Value},
new() {Value = storedEffect.StoredEffectId.Value},
new() {Value = storedEffect.IsState},
new() {Value = (int) storedEffect.WorkStatus},
new() {Value = storedEffect.Result ?? (object) DBNull.Value},
new() {Value = JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value},
new() {Value = storedEffect.EffectId.Value},
}
};
batch.BatchCommands.Add(command);
}


await batch.ExecuteNonQueryAsync();
}

private string? _getEffectResultsSql;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using Cleipnir.ResilientFunctions.Helpers;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Cleipnir.ResilientFunctions.SqlServer.Tests;
Expand Down Expand Up @@ -29,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects()
[TestMethod]
public override Task TruncateDeletesAllEffects()
=> TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore());

[TestMethod]
public override Task BulkInsertTest()
=> BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore));
}
Loading

0 comments on commit a0a38cd

Please sign in to comment.