diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs index 12da8805..9e78bfef 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/EffectStoreTests.cs @@ -30,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects() [TestMethod] public override Task TruncateDeletesAllEffects() => TruncateDeletesAllEffects(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore)); + + [TestMethod] + public override Task BulkInsertTest() + => BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore)); } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs index 4494e067..afc7e929 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/EffectStoreTests.cs @@ -259,4 +259,37 @@ await store .SelectAsync(e => e.Any()) .ShouldBeFalseAsync(); } + + public abstract Task BulkInsertTest(); + protected async Task BulkInsertTest(Task storeTask) + { + var store = await storeTask; + var storedId = TestStoredId.Create(); + var storedEffect1 = new StoredEffect( + "EffectId1", + "EffectId1".ToStoredEffectId(), + IsState: false, + WorkStatus.Started, + Result: "some result 1".ToUtf8Bytes(), + StoredException: null + ); + var storedEffect2 = new StoredEffect( + "EffectId2", + "EffectId2".ToStoredEffectId(), + IsState: false, + WorkStatus.Completed, + Result: "some result 2".ToUtf8Bytes(), + StoredException: null + ); + + + await store.SetEffectResults(storedId, [storedEffect1, storedEffect2]); + + var effects = await store.GetEffectResults(storedId); + effects.Count.ShouldBe(2); + effects[0].EffectId.ShouldBe(storedEffect1.EffectId); + effects[0].Result.ShouldBe("some result 1".ToUtf8Bytes()); + effects[1].EffectId.ShouldBe(storedEffect2.EffectId); + effects[1].Result.ShouldBe("some result 2".ToUtf8Bytes()); + } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs index bb4efa0b..fa8c55a0 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableEffectStore.cs @@ -30,7 +30,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect) ? Task.FromException(new TimeoutException()) : _inner.SetEffectResult(storedId, storedEffect); - public Task SetEffectResults(StoredId storedId, IEnumerable storedEffects) + public Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects) => _crashed ? Task.FromException(new TimeoutException()) : _inner.SetEffectResults(storedId, storedEffects); diff --git a/Core/Cleipnir.ResilientFunctions/Helpers/Helpers.cs b/Core/Cleipnir.ResilientFunctions/Helpers/Helpers.cs index 718359b0..be5204a2 100644 --- a/Core/Cleipnir.ResilientFunctions/Helpers/Helpers.cs +++ b/Core/Cleipnir.ResilientFunctions/Helpers/Helpers.cs @@ -78,4 +78,10 @@ public static IEnumerable WithRandomOffset(this IReadOnlyList elms) public static Guid ToGuid(this string s) => Guid.Parse(s); public static string StringJoin(this IEnumerable strings, string separator) => string.Join(separator, strings); + + public static IEnumerable Replicate(this string str, int count) + { + for (var i = 0; i < count; i++) + yield return str; + } } \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs index 351f3aa9..f84fe5ea 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IEffectsStore.cs @@ -8,7 +8,7 @@ public interface IEffectsStore Task Initialize(); Task Truncate(); Task SetEffectResult(StoredId storedId, StoredEffect storedEffect); - Task SetEffectResults(StoredId storedId, IEnumerable storedEffects); + Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects); Task> GetEffectResults(StoredId storedId); Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState); Task Remove(StoredId storedId); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs index 434243ce..2fa9caf5 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryEffectsStore.cs @@ -35,7 +35,7 @@ public Task SetEffectResult(StoredId storedId, StoredEffect storedEffect) return Task.CompletedTask; } - public async Task SetEffectResults(StoredId storedId, IEnumerable storedEffects) + public async Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects) { foreach (var storedEffect in storedEffects) await SetEffectResult(storedId, storedEffect); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs index 79c4a8fe..308203da 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/EffectsStoreTests.cs @@ -1,4 +1,5 @@ -using Microsoft.VisualStudio.TestTools.UnitTesting; +using Cleipnir.ResilientFunctions.Helpers; +using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Cleipnir.ResilientFunctions.MariaDb.Tests; @@ -28,4 +29,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects() [TestMethod] public override Task TruncateDeletesAllEffects() => TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore()); + + [TestMethod] + public override Task BulkInsertTest() + => BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore)); } \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs index 7c06af9c..f49a2c59 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbEffectsStore.cs @@ -1,4 +1,5 @@ using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Storage; using Cleipnir.ResilientFunctions.Storage.Utils; using MySqlConnector; @@ -75,9 +76,37 @@ ON DUPLICATE KEY UPDATE await command.ExecuteNonQueryAsync(); } - public Task SetEffectResults(StoredId storedId, IEnumerable storedEffects) + private string? _setEffectResultsSql; + public async Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects) { - throw new NotImplementedException(); + await using var conn = await CreateConnection(); + _setEffectResultsSql ??= $@" + INSERT INTO {_tablePrefix}_effects + (type, instance, id_hash, is_state, status, result, exception, effect_id) + VALUES + @VALUES + ON DUPLICATE KEY UPDATE + status = VALUES(status), result = VALUES(result), exception = VALUES(exception)"; + + var sql = _setEffectResultsSql.Replace( + "@VALUES", + "(?, ?, ?, ?, ?, ?, ?, ?)".Replicate(storedEffects.Count).StringJoin(", ") + ); + + await using var command = new MySqlCommand(sql, conn); + foreach (var storedEffect in storedEffects) + { + command.Parameters.Add(new MySqlParameter(name: null, storedId.Type.Value)); + command.Parameters.Add(new MySqlParameter(name: null, storedId.Instance.Value.ToString("N"))); + command.Parameters.Add(new MySqlParameter(name: null, storedEffect.StoredEffectId.Value.ToString("N"))); + command.Parameters.Add(new MySqlParameter(name: null, storedEffect.IsState)); + command.Parameters.Add(new MySqlParameter(name: null, (int) storedEffect.WorkStatus)); + command.Parameters.Add(new MySqlParameter(name: null, storedEffect.Result ?? (object) DBNull.Value)); + command.Parameters.Add(new MySqlParameter(name: null, JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value)); + command.Parameters.Add(new MySqlParameter(name: null, storedEffect.EffectId.Value)); + } + + await command.ExecuteNonQueryAsync(); } private string? _getEffectResultsSql; diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs index 80fd17bf..d75f1421 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/EffectStoreTests.cs @@ -30,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects() [TestMethod] public override Task TruncateDeletesAllEffects() => TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore()); + + [TestMethod] + public override Task BulkInsertTest() + => BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore)); } \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs index cd972885..6cf75b3a 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlEffectsStore.cs @@ -70,9 +70,40 @@ ON CONFLICT (type, instance, id_hash, is_state) await command.ExecuteNonQueryAsync(); } - public Task SetEffectResults(StoredId storedId, IEnumerable storedEffects) + public async Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects) { - throw new NotImplementedException(); + await using var conn = await CreateConnection(); + _setEffectResultSql ??= $@" + INSERT INTO {tablePrefix}_effects + (type, instance, id_hash, is_state, status, result, exception, effect_id) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (type, instance, id_hash, is_state) + DO + UPDATE SET status = EXCLUDED.status, result = EXCLUDED.result, exception = EXCLUDED.exception"; + + await using var batch = new NpgsqlBatch(conn); + foreach (var storedEffect in storedEffects) + { + var command = new NpgsqlBatchCommand(_setEffectResultSql) + { + Parameters = + { + new() {Value = storedId.Type.Value}, + new() {Value = storedId.Instance.Value}, + new() {Value = storedEffect.StoredEffectId.Value}, + new() {Value = storedEffect.IsState}, + new() {Value = (int) storedEffect.WorkStatus}, + new() {Value = storedEffect.Result ?? (object) DBNull.Value}, + new() {Value = JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value}, + new() {Value = storedEffect.EffectId.Value}, + } + }; + batch.BatchCommands.Add(command); + } + + + await batch.ExecuteNonQueryAsync(); } private string? _getEffectResultsSql; diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectStoreTests.cs index 41057ee8..bc7560e4 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectStoreTests.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/EffectStoreTests.cs @@ -1,4 +1,5 @@ using System.Threading.Tasks; +using Cleipnir.ResilientFunctions.Helpers; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace Cleipnir.ResilientFunctions.SqlServer.Tests; @@ -29,4 +30,8 @@ public override Task DeleteFunctionIdDeletesAllRelatedEffects() [TestMethod] public override Task TruncateDeletesAllEffects() => TruncateDeletesAllEffects(FunctionStoreFactory.CreateEffectStore()); + + [TestMethod] + public override Task BulkInsertTest() + => BulkInsertTest(FunctionStoreFactory.Create().SelectAsync(fs => fs.EffectsStore)); } \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs index ab13bba3..97116c7a 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerEffectsStore.cs @@ -1,32 +1,25 @@ using System; using System.Collections.Generic; using System.Data.SqlTypes; +using System.Linq; using System.Text.Json; using System.Threading.Tasks; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Storage; using Cleipnir.ResilientFunctions.Storage.Utils; using Microsoft.Data.SqlClient; namespace Cleipnir.ResilientFunctions.SqlServer; -public class SqlServerEffectsStore : IEffectsStore +public class SqlServerEffectsStore(string connectionString, string tablePrefix = "") : IEffectsStore { - private readonly string _tablePrefix; - private readonly Func> _connFunc; - - public SqlServerEffectsStore(string connectionString, string tablePrefix = "") - { - _tablePrefix = tablePrefix; - _connFunc = CreateConnection(connectionString); - } - private string? _initializeSql; public async Task Initialize() { - await using var conn = await _connFunc(); + await using var conn = await CreateConnection(); _initializeSql ??= @$" - CREATE TABLE {_tablePrefix}_Effects ( + CREATE TABLE {tablePrefix}_Effects ( FlowType INT, FlowInstance UNIQUEIDENTIFIER, StoredId UNIQUEIDENTIFIER, @@ -49,8 +42,8 @@ PRIMARY KEY (FlowType, FlowInstance, StoredId, IsState) private string? _truncateSql; public async Task Truncate() { - await using var conn = await _connFunc(); - _truncateSql ??= $"TRUNCATE TABLE {_tablePrefix}_Effects"; + await using var conn = await CreateConnection(); + _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_Effects"; await using var command = new SqlCommand(_truncateSql, conn); await command.ExecuteNonQueryAsync(); } @@ -58,12 +51,12 @@ public async Task Truncate() private string? _setEffectResultSql; public async Task SetEffectResult(StoredId storedId, StoredEffect storedEffect) { - await using var conn = await _connFunc(); + await using var conn = await CreateConnection(); _setEffectResultSql ??= $@" - MERGE INTO {_tablePrefix}_Effects + MERGE INTO {tablePrefix}_Effects USING (VALUES (@FlowType, @FlowInstance, @StoredId, @IsState, @EffectId, @Status, @Result, @Exception)) AS source (FlowType, FlowInstance, StoredId, IsState, EffectId, Status, Result, Exception) - ON {_tablePrefix}_Effects.FlowType = source.FlowType AND {_tablePrefix}_Effects.FlowInstance = source.FlowInstance AND {_tablePrefix}_Effects.StoredId = source.StoredId AND {_tablePrefix}_Effects.IsState = source.IsState + ON {tablePrefix}_Effects.FlowType = source.FlowType AND {tablePrefix}_Effects.FlowInstance = source.FlowInstance AND {tablePrefix}_Effects.StoredId = source.StoredId AND {tablePrefix}_Effects.IsState = source.IsState WHEN MATCHED THEN UPDATE SET Status = source.Status, Result = source.Result, Exception = source.Exception WHEN NOT MATCHED THEN @@ -84,43 +77,51 @@ WHEN NOT MATCHED THEN } private static string? _setEffectResultsSql; - public async Task SetEffectResults(StoredId storedId, IEnumerable storedEffects) + public async Task SetEffectResults(StoredId storedId, IReadOnlyList storedEffects) { - /* - var (flowType, flowInstance) = storedId; - await using var conn = await _connFunc(); - _setEffectResultSql ??= $@" - MERGE INTO {_tablePrefix}_Effects + await using var conn = await CreateConnection(); + _setEffectResultsSql ??= $@" + MERGE INTO {tablePrefix}_Effects USING (VALUES @VALUES) - AS source (Id,IsState,Status,Result,Exception) - ON {_tablePrefix}_Effects.Id = source.Id AND {_tablePrefix}_Effects.IsState = source.IsState + AS source (FlowType, FlowInstance, StoredId, IsState, EffectId, Status, Result, Exception) + ON {tablePrefix}_Effects.FlowType = source.FlowType AND {tablePrefix}_Effects.FlowInstance = source.FlowInstance AND {tablePrefix}_Effects.StoredId = source.StoredId AND {tablePrefix}_Effects.IsState = source.IsState WHEN MATCHED THEN UPDATE SET Status = source.Status, Result = source.Result, Exception = source.Exception WHEN NOT MATCHED THEN - INSERT (Id, IsState, Status, Result, Exception) - VALUES (source.Id, source.IsState, source.Status, source.Result, source.Exception);"; - - await using var command = new SqlCommand(_setEffectResultSql, conn); - //(@Id,@IsState,@Status,@Result,@Exception) - - var escapedId = Escaper.Escape(flowType.Value.ToString(), flowInstance.Value.ToString(), storedEffect.EffectId.ToString()); - command.Parameters.AddWithValue("@Id", escapedId); - command.Parameters.AddWithValue("@IsState", storedEffect.IsState); - command.Parameters.AddWithValue("@Status", storedEffect.WorkStatus); - command.Parameters.AddWithValue("@Result", storedEffect.Result ?? (object) SqlBinary.Null); - command.Parameters.AddWithValue("@Exception", JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value); + INSERT (FlowType, FlowInstance, StoredId, IsState, EffectId, Status, Result, Exception) + VALUES (source.FlowType, source.FlowInstance, source.StoredId, source.IsState, source.EffectId, source.Status, source.Result, source.Exception);"; - await command.ExecuteNonQueryAsync();*/ - throw new NotImplementedException(); + var sql = _setEffectResultsSql.Replace( + "@VALUES", + "(@FlowType#, @FlowInstance#, @StoredId#, @IsState#, @EffectId#, @Status#, @Result#, @Exception#)" + .Replicate(storedEffects.Count) + .Select((s, i) => s.Replace("#", i.ToString())) + .StringJoin(", ") + ); + await using var command = new SqlCommand(sql, conn); + for (var i = 0; i < storedEffects.Count; i++) + { + var storedEffect = storedEffects[i]; + command.Parameters.AddWithValue($"@FlowType{i}", storedId.Type.Value); + command.Parameters.AddWithValue($"@FlowInstance{i}", storedId.Instance.Value); + command.Parameters.AddWithValue($"@StoredId{i}", storedEffect.StoredEffectId.Value); + command.Parameters.AddWithValue($"@IsState{i}", storedEffect.IsState); + command.Parameters.AddWithValue($"@EffectId{i}", storedEffect.EffectId.Value); + command.Parameters.AddWithValue($"@Status{i}", storedEffect.WorkStatus); + command.Parameters.AddWithValue($"@Result{i}", storedEffect.Result ?? (object) SqlBinary.Null); + command.Parameters.AddWithValue($"@Exception{i}", JsonHelper.ToJson(storedEffect.StoredException) ?? (object) DBNull.Value); + } + + await command.ExecuteNonQueryAsync(); } private string? _getEffectResultsSql; public async Task> GetEffectResults(StoredId storedId) { - await using var conn = await _connFunc(); + await using var conn = await CreateConnection(); _getEffectResultsSql ??= @$" SELECT StoredId, IsState, EffectId, Status, Result, Exception - FROM {_tablePrefix}_Effects + FROM {tablePrefix}_Effects WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance"; await using var command = new SqlCommand(_getEffectResultsSql, conn); @@ -150,9 +151,9 @@ public async Task> GetEffectResults(StoredId storedI private string? _deleteEffectResultSql; public async Task DeleteEffectResult(StoredId storedId, StoredEffectId effectId, bool isState) { - await using var conn = await _connFunc(); + await using var conn = await CreateConnection(); _deleteEffectResultSql ??= @$" - DELETE FROM {_tablePrefix}_Effects + DELETE FROM {tablePrefix}_Effects WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance AND StoredId = @StoredId AND IsState = @IsState"; await using var command = new SqlCommand(_deleteEffectResultSql, conn); @@ -167,9 +168,9 @@ DELETE FROM {_tablePrefix}_Effects private string? _removeSql; public async Task Remove(StoredId storedId) { - await using var conn = await _connFunc(); + await using var conn = await CreateConnection(); _removeSql ??= @$" - DELETE FROM {_tablePrefix}_Effects + DELETE FROM {tablePrefix}_Effects WHERE FlowType = @FlowType AND FlowInstance = @FlowInstance"; await using var command = new SqlCommand(_removeSql, conn); @@ -179,13 +180,10 @@ DELETE FROM {_tablePrefix}_Effects await command.ExecuteNonQueryAsync(); } - private static Func> CreateConnection(string connectionString) + private async Task CreateConnection() { - return async () => - { - var connection = new SqlConnection(connectionString); - await connection.OpenAsync(); - return connection; - }; + var connection = new SqlConnection(connectionString); + await connection.OpenAsync(); + return connection; } } \ No newline at end of file