From 9c11a10cd9ec118bd6d832f11da08c4d2ceb1087 Mon Sep 17 00:00:00 2001 From: stidsborg Date: Sat, 14 Dec 2024 07:27:14 +0100 Subject: [PATCH] Removed LogStore --- .../LeaseUpdaterTestFunctionStore.cs | 1 - .../InMemoryTests/LogStoreTests.cs | 24 -- .../TestTemplates/LogStoreTests.cs | 155 --------- .../WatchDogsTests/CrashableFunctionStore.cs | 1 - .../Storage/IFunctionStore.cs | 1 - .../Storage/ILogStore.cs | 22 -- .../Storage/InMemoryFunctionStore.cs | 1 - .../Storage/InMemoryLogStore.cs | 106 ------ .../Utils/CrashableFunctionStore.cs | 1 - .../LogStoreTests.cs | 24 -- .../MariaDbFunctionStore.cs | 9 +- .../MariaDbLogStore.cs | 317 ------------------ .../LogStoreTests.cs | 24 -- .../PostgreSqlFunctionStore.cs | 5 - .../PostgresSqlLogStore.cs | 273 --------------- .../LogStoreTests.cs | 24 -- .../SqlServerFunctionStore.cs | 5 - .../SqlServerLogStore.cs | 248 -------------- 18 files changed, 2 insertions(+), 1239 deletions(-) delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LogStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/LogStoreTests.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/ILogStore.cs delete mode 100644 Core/Cleipnir.ResilientFunctions/Storage/InMemoryLogStore.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/LogStoreTests.cs delete mode 100644 Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbLogStore.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/LogStoreTests.cs delete mode 100644 Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlLogStore.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/LogStoreTests.cs delete mode 100644 Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerLogStore.cs diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs index bce02ef8..441aa9fe 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LeaseUpdaterTests/LeaseUpdaterTestFunctionStore.cs @@ -24,7 +24,6 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore public ICorrelationStore CorrelationStore => _inner.CorrelationStore; public Utilities Utilities => _inner.Utilities; public IMigrator Migrator => _inner.Migrator; - public ILogStore LogStore => _inner.LogStore; public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore; public Task Initialize() => _inner.Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LogStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LogStoreTests.cs deleted file mode 100644 index 70557a4e..00000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/InMemoryTests/LogStoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.Tests.InMemoryTests; - -[TestClass] -public class LogStoreTests : TestTemplates.LogStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetTest() - => GetEntriesWithOffsetTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetAndOwnerTest() - => GetEntriesWithOffsetAndOwnerTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task AppendMultipleEntriesAtOnce() - => AppendMultipleEntriesAtOnce(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/LogStoreTests.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/LogStoreTests.cs deleted file mode 100644 index 1c668414..00000000 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/LogStoreTests.cs +++ /dev/null @@ -1,155 +0,0 @@ -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Cleipnir.ResilientFunctions.Tests.Utils; -using Shouldly; - -namespace Cleipnir.ResilientFunctions.Tests.TestTemplates; - -public abstract class LogStoreTests -{ - public abstract Task SunshineScenarioTest(); - protected async Task SunshineScenarioTest(Task storeTask) - { - var logStore = (await storeTask).LogStore; - var storedId = TestStoredId.Create(); - - var entries = await logStore.GetEntries(storedId); - entries.ShouldBeEmpty(); - - var owner1 = new Owner(1); - var msg1 = "hallo world".ToUtf8Bytes(); - var position1 = await logStore.Append(storedId, msg1, owner1); - var msg2 = "hallo again".ToUtf8Bytes(); - var position2 = await logStore.Append(storedId, msg2, owner1); - var owner2 = new Owner(2); - var msg3 = "hallo from owner2".ToUtf8Bytes(); - var position3 = await logStore.Append(storedId, msg3, owner2); - - entries = await logStore.GetEntries(storedId); - entries.Count.ShouldBe(3); - - entries[0].Position.ShouldBe(position1); - entries[0].Owner.ShouldBe(owner1); - entries[0].Content.ShouldBe(msg1); - - entries[1].Position.ShouldBe(position2); - entries[1].Owner.ShouldBe(owner1); - entries[1].Content.ShouldBe(msg2); - - entries[2].Position.ShouldBe(position3); - entries[2].Owner.ShouldBe(owner2); - entries[2].Content.ShouldBe(msg3); - - await logStore.Delete(storedId, position1); - entries = await logStore.GetEntries(storedId); - entries.Count.ShouldBe(2); - - entries[0].Position.ShouldBe(position2); - entries[0].Owner.ShouldBe(owner1); - entries[0].Content.ShouldBe(msg2); - - entries[1].Position.ShouldBe(position3); - entries[1].Owner.ShouldBe(owner2); - entries[1].Content.ShouldBe(msg3); - } - - public abstract Task GetEntriesWithOffsetTest(); - protected async Task GetEntriesWithOffsetTest(Task storeTask) - { - var logStore = (await storeTask).LogStore; - var storedId = TestStoredId.Create(); - - var owner1 = new Owner(1); - var msg1 = "hallo world".ToUtf8Bytes(); - var position1 = await logStore.Append(storedId, msg1, owner1); - var msg2 = "hallo again".ToUtf8Bytes(); - var position2 = await logStore.Append(storedId, msg2, owner1); - var owner2 = new Owner(2); - var msg3 = "hallo from owner2".ToUtf8Bytes(); - var position3 = await logStore.Append(storedId, msg3, owner2); - - var entries = await logStore.GetEntries(storedId, position1); - entries.Count.ShouldBe(2); - var (entryOwner, entryPosition, entryContent) = entries[0]; - entryOwner.ShouldBe(owner1); - entryPosition.ShouldBe(position2); - entryContent.ShouldBe(msg2); - (entryOwner, entryPosition, entryContent) = entries[1]; - entryOwner.ShouldBe(owner2); - entryPosition.ShouldBe(position3); - entryContent.ShouldBe(msg3); - - entries = await logStore.GetEntries(storedId, position3); - entries.ShouldBeEmpty(); - } - - public abstract Task GetEntriesWithOffsetAndOwnerTest(); - protected async Task GetEntriesWithOffsetAndOwnerTest(Task storeTask) - { - var logStore = (await storeTask).LogStore; - var storedId = TestStoredId.Create(); - - var owner1 = new Owner(1); - var msg1 = "hallo world".ToUtf8Bytes(); - var position1 = await logStore.Append(storedId, msg1, owner1); - var msg2 = "hallo again".ToUtf8Bytes(); - var position2 = await logStore.Append(storedId, msg2, owner1); - var owner2 = new Owner(2); - var msg3 = "hallo from owner2".ToUtf8Bytes(); - var position3 = await logStore.Append(storedId, msg3, owner2); - - var (maxPosition, entries) = await logStore.GetEntries(storedId, position1, owner1); - maxPosition.ShouldBe(position3); - entries.Count.ShouldBe(1); - var (entryOwner, entryPosition, entryContent) = entries.Single(); - entryOwner.ShouldBe(owner1); - entryPosition.ShouldBe(position2); - entryContent.ShouldBe(msg2); - - var secondGetEntries = await logStore.GetEntries(storedId, maxPosition, owner1); - secondGetEntries.Entries.ShouldBeEmpty(); - secondGetEntries.MaxPosition.ShouldBe(maxPosition); - } - - public abstract Task AppendMultipleEntriesAtOnce(); - protected async Task AppendMultipleEntriesAtOnce(Task storeTask) - { - var logStore = (await storeTask).LogStore; - var storedId1 = TestStoredId.Create(); - var storedId2 = TestStoredId.Create(); - - var owner1 = new Owner(1); - var msg1 = "hallo world".ToUtf8Bytes(); - var position1 = await logStore.Append(storedId1, msg1, owner1); - - var positions = await logStore.Append([ - new AppendEntry(storedId1, owner1, "hallo again".ToUtf8Bytes()), - new AppendEntry(storedId2, owner1, "hallo from other id".ToUtf8Bytes()), - new AppendEntry(storedId1, owner1, "hallo again again".ToUtf8Bytes()) - ]); - - var entriesId1 = await logStore.GetEntries(storedId1); - entriesId1.Count.ShouldBe(3); - entriesId1[0].Position.ShouldBe(position1); - entriesId1[0].Owner.ShouldBe(owner1); - entriesId1[0].Content.ShouldBe(msg1); - - entriesId1[1].Position.ShouldBe(positions[0]); - entriesId1[1].Owner.ShouldBe(owner1); - entriesId1[1].Content.ShouldBe("hallo again".ToUtf8Bytes()); - - entriesId1[2].Position.ShouldBe(positions[2]); - entriesId1[2].Owner.ShouldBe(owner1); - entriesId1[2].Content.ShouldBe("hallo again again".ToUtf8Bytes()); - - var entriesId2 = await logStore.GetEntries(storedId2); - entriesId2.Count.ShouldBe(1); - entriesId2[0].Position.ShouldBe(positions[1]); - entriesId2[0].Owner.ShouldBe(owner1); - entriesId2[0].Content.ShouldBe("hallo from other id".ToUtf8Bytes()); - - await logStore.GetEntries(TestStoredId.Create()).ShouldBeEmptyAsync(); - } -} \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs index 9e75ee1a..5583e6a4 100644 --- a/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions.Tests/TestTemplates/WatchDogsTests/CrashableFunctionStore.cs @@ -25,7 +25,6 @@ public class CrashableFunctionStore : IFunctionStore public ICorrelationStore CorrelationStore => _crashed ? throw new TimeoutException() : _inner.CorrelationStore; public Utilities Utilities => _crashed ? throw new TimeoutException() : _inner.Utilities; public IMigrator Migrator => _crashed ? throw new TimeoutException() : _inner.Migrator; - public ILogStore LogStore => _crashed ? throw new TimeoutException() : _inner.LogStore; public ISemaphoreStore SemaphoreStore => _crashed ? throw new TimeoutException() : _inner.SemaphoreStore; public CrashableFunctionStore(IFunctionStore inner) diff --git a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs index 743f9803..8535b3d6 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/IFunctionStore.cs @@ -15,7 +15,6 @@ public interface IFunctionStore public ICorrelationStore CorrelationStore { get; } public Utilities Utilities { get; } public IMigrator Migrator { get; } - public ILogStore LogStore { get; } public ISemaphoreStore SemaphoreStore { get; } public Task Initialize(); diff --git a/Core/Cleipnir.ResilientFunctions/Storage/ILogStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/ILogStore.cs deleted file mode 100644 index 029e6eeb..00000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/ILogStore.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Cleipnir.ResilientFunctions.Storage; - -public interface ILogStore -{ - public Task Update(StoredId id, Position position, byte[] content, Owner owner); - public Task Delete(StoredId id, Position position); - public Task Append(StoredId id, byte[] content, Owner owner); - public Task> Append(IEnumerable entries); - public Task> GetEntries(StoredId id); - public Task> GetEntries(StoredId id, Position offset); - public Task GetEntries(StoredId id, Position offset, Owner owner); -} - -public record MaxPositionAndEntries(Position MaxPosition, IReadOnlyList Entries); -public record StoredLogEntry(Owner Owner, Position Position, byte[] Content); -public record Owner(int Value); -public record Position(string Value); -public record Content(byte[] Value); -public record AppendEntry(StoredId Id, Owner Owner, byte[] Content); \ No newline at end of file diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs index 64cb3479..a60908ce 100644 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs +++ b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryFunctionStore.cs @@ -29,7 +29,6 @@ public class InMemoryFunctionStore : IFunctionStore, IMessageStore public Utilities Utilities { get; } public IMigrator Migrator { get; } = new InMemoryMigrator(); - public ILogStore LogStore { get; } = new InMemoryLogStore(); public ISemaphoreStore SemaphoreStore { get; } = new InMemorySemaphoreStore(); public Task Initialize() => Task.CompletedTask; diff --git a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryLogStore.cs b/Core/Cleipnir.ResilientFunctions/Storage/InMemoryLogStore.cs deleted file mode 100644 index 7fb79331..00000000 --- a/Core/Cleipnir.ResilientFunctions/Storage/InMemoryLogStore.cs +++ /dev/null @@ -1,106 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Helpers; - -namespace Cleipnir.ResilientFunctions.Storage; - -public class InMemoryLogStore : ILogStore -{ - private record LogState(Owner Owner, byte[] Content); - - private readonly Dictionary> _logStates = new(); - private readonly object _sync = new(); - - public Task Update(StoredId id, Position position, byte[] content, Owner owner) - { - lock (_sync) - GetDictionary(id)[position] = new LogState(owner, content); - - return position.ToTask(); - } - - public Task Delete(StoredId id, Position position) - { - lock (_sync) - GetDictionary(id).Remove(position); - - return Task.CompletedTask; - } - - public Task Append(StoredId id, byte[] content, Owner owner) - { - lock (_sync) - { - var dict = GetDictionary(id); - var position = dict.Count == 0 - ? new Position("0") - : new Position((dict.Keys.Select(k => int.Parse(k.Value)).Max() + 1).ToString()); - - dict[position] = new LogState(owner, content); - return position.ToTask(); - } - } - - public async Task> Append(IEnumerable entries) - { - var positions = new List(); - foreach (var (storedId, owner, content) in entries) - { - var position = await Append(storedId, content, owner); - positions.Add(position); - } - - return positions; - } - - public Task> GetEntries(StoredId id) - { - lock (_sync) - return GetDictionary(id) - .Select(kv => new { Position = kv.Key, Content = kv.Value.Content, Owner = kv.Value.Owner }) - .Select(a => new StoredLogEntry(a.Owner, a.Position, a.Content)) - .ToList() - .CastTo>() - .ToTask(); - } - - public Task> GetEntries(StoredId id, Position offset) - { - return GetEntries(id) - .Result - .Where(e => int.Parse(e.Position.Value) > int.Parse(offset.Value)) - .ToList() - .CastTo>() - .ToTask(); - } - - public Task GetEntries(StoredId id, Position offset, Owner owner) - { - lock (_sync) - { - var allEntries = GetEntries(id).Result; - if (allEntries.Count == 0) - return new MaxPositionAndEntries(offset, Entries: []).ToTask(); - - var entries = allEntries - .Where(e => e.Owner == owner) - .Where(e => int.Parse(e.Position.Value) > int.Parse(offset.Value)) - .ToList() - .CastTo>(); - - var maxPosition = allEntries.Max(e => int.Parse(e.Position.Value)); - return new MaxPositionAndEntries(new Position(maxPosition.ToString()), entries).ToTask(); - } - } - - private Dictionary GetDictionary(StoredId id) - { - lock (_sync) - if (!_logStates.TryGetValue(id, out var logState)) - return _logStates[id] = new Dictionary(); - else - return logState; - } -} \ No newline at end of file diff --git a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs index 0cc05137..b1acb09c 100644 --- a/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs +++ b/Samples/Sample.ConsoleApp/Utils/CrashableFunctionStore.cs @@ -20,7 +20,6 @@ public class CrashableFunctionStore : IFunctionStore public ICorrelationStore CorrelationStore => _inner.CorrelationStore; public Utilities Utilities => _inner.Utilities; public IMigrator Migrator => _inner.Migrator; - public ILogStore LogStore => _inner.LogStore; public ISemaphoreStore SemaphoreStore => _inner.SemaphoreStore; public CrashableFunctionStore(IFunctionStore inner) => _inner = inner; diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/LogStoreTests.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/LogStoreTests.cs deleted file mode 100644 index 64e8bda8..00000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB.Tests/LogStoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using Cleipnir.ResilientFunctions.MariaDb.Tests; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.MariaDb.Tests; - -[TestClass] -public class LogStoreTests : ResilientFunctions.Tests.TestTemplates.LogStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetTest() - => GetEntriesWithOffsetTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetAndOwnerTest() - => GetEntriesWithOffsetAndOwnerTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task AppendMultipleEntriesAtOnce() - => AppendMultipleEntriesAtOnce(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs index 3f47a4b8..77762d0b 100644 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs +++ b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbFunctionStore.cs @@ -33,10 +33,8 @@ public class MariaDbFunctionStore : IFunctionStore private readonly MariaDbMigrator _migrator; public IMigrator Migrator => _migrator; - - private readonly MariaDbLogStore _logStore; - public ILogStore LogStore => _logStore; - private MariaDbSemaphoreStore _semaphoreStore; + + private readonly MariaDbSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; public Utilities Utilities { get; } @@ -52,7 +50,6 @@ public MariaDbFunctionStore(string connectionString, string tablePrefix = "") _messageStore = new MariaDbMessageStore(connectionString, tablePrefix); _effectsStore = new MariaDbEffectsStore(connectionString, tablePrefix); _correlationStore = new MariaDbCorrelationStore(connectionString, tablePrefix); - _logStore = new MariaDbLogStore(connectionString, tablePrefix); _semaphoreStore = new MariaDbSemaphoreStore(connectionString, tablePrefix); _timeoutStore = new MariaDbTimeoutStore(connectionString, tablePrefix); _mariaDbUnderlyingRegister = new MariaDbUnderlyingRegister(connectionString, tablePrefix); @@ -73,7 +70,6 @@ public async Task Initialize() await MessageStore.Initialize(); await EffectsStore.Initialize(); await CorrelationStore.Initialize(); - await _logStore.Initialize(); await _semaphoreStore.Initialize(); await TimeoutStore.Initialize(); await _typeStore.Initialize(); @@ -108,7 +104,6 @@ public async Task TruncateTables() await _mariaDbUnderlyingRegister.TruncateTable(); await _effectsStore.Truncate(); await _correlationStore.Truncate(); - await _logStore.Truncate(); await _semaphoreStore.Truncate(); await _typeStore.Truncate(); diff --git a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbLogStore.cs b/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbLogStore.cs deleted file mode 100644 index 037bf214..00000000 --- a/Stores/MariaDB/Cleipnir.ResilientFunctions.MariaDB/MariaDbLogStore.cs +++ /dev/null @@ -1,317 +0,0 @@ -using Cleipnir.ResilientFunctions.Storage; -using MySqlConnector; - -namespace Cleipnir.ResilientFunctions.MariaDb; - -public class MariaDbLogStore : ILogStore -{ - private readonly string _connectionString; - private readonly string _tablePrefix; - - public MariaDbLogStore(string connectionString, string tablePrefix = "") - { - _connectionString = connectionString; - _tablePrefix = tablePrefix; - } - - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _initializeSql ??= @$" - CREATE TABLE IF NOT EXISTS {_tablePrefix}_logs ( - type INT, - instance CHAR(32), - position INT NOT NULL, - owner INT NOT NULL, - content LONGBLOB NOT NULL, - PRIMARY KEY (type, instance, position) - );"; - var command = new MySqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateTableSql; - public async Task Truncate() - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString);; - _truncateTableSql ??= $"TRUNCATE TABLE {_tablePrefix}_logs;"; - var command = new MySqlCommand(_truncateTableSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateSql; - public async Task Truncate(StoredId storedId) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _truncateSql ??= @$" - DELETE FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ?"; - - await using var command = new MySqlCommand(_truncateSql, conn); - command.Parameters.Add(new() { Value = storedId.Type.Value }); - command.Parameters.Add(new() { Value = storedId.Instance.Value.ToString("N") }); - - await command.ExecuteNonQueryAsync(); - } - - private string? _updateSql; - public async Task Update(StoredId id, Position position, byte[] content, Owner owner) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - - _updateSql ??= @$" - UPDATE {_tablePrefix}_logs - SET content = ? - WHERE type = ? AND instance = ? AND position = ?"; - await using var command = new MySqlCommand(_updateSql, conn) - { - Parameters = - { - new() {Value = content}, - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value.ToString("N")}, - new() {Value = int.Parse(position.Value)} - } - }; - var affectedRows = await command.ExecuteNonQueryAsync(); - if (affectedRows == 0) - throw new InvalidOperationException("Unable to find position"); - - return position; - } - - private string? _deleteSql; - public async Task Delete(StoredId id, Position position) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - - _deleteSql ??= @$" - DELETE FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? AND position = ?"; - await using var command = new MySqlCommand(_deleteSql, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value.ToString("N")}, - new() {Value = int.Parse(position.Value)} - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _appendSql; - public async Task Append(StoredId id, byte[] content, Owner owner) - { - for (var i = 0; i < 10; i++) //retry if deadlock occurs - try - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - //https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock - var lockName = id.ToString().GenerateSHA256Hash(); - _appendSql ??= @$" - SELECT GET_LOCK(?, 10); - INSERT INTO {_tablePrefix}_logs - (type, instance, position, owner, content) - SELECT ?, ?, COALESCE(MAX(position), -1) + 1, ?, ? - FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? - RETURNING position; - SELECT RELEASE_LOCK(?);"; - - /* - SELECT epoch, status - FROM {_tablePrefix} - WHERE type = ? AND instance = ?;*/ - - await using var command = new MySqlCommand(_appendSql, conn) - { - Parameters = - { - new() { Value = lockName }, - new() { Value = id.Type.Value }, - new() { Value = id.Instance.Value.ToString("N") }, - new() { Value = owner.Value }, - new() { Value = content }, - - new() { Value = id.Type.Value }, - new() { Value = id.Instance.Value.ToString("N") }, - new() { Value = lockName }, - //new() { Value = id.Type.Value }, - //new() { Value = id.Instance.Value.ToString("N") }, - } - }; - - await using var reader = await command.ExecuteReaderAsync(); - await reader.NextResultAsync(); //lock select - - await reader.ReadAsync(); //position return - var position = reader.GetInt32(0); - - return new Position(position.ToString()); - /* - await reader.NextResultAsync(); - while (await reader.ReadAsync()) - { - var epoch = reader.GetInt32(0); - var status = (Status)reader.GetInt32(1); - return new FunctionStatus(status, epoch); - }*/ - } - catch (MySqlException e) when (e.Number == 1213) //deadlock found when trying to get lock; try restarting transaction - { - if (i == 9) - throw; - - await Task.Delay(Random.Shared.Next(10, 250)); - } - - return null!; - } - - private string? _appendsSql; - public async Task> Append(IEnumerable entries) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _appendsSql ??= @$" - INSERT INTO {_tablePrefix}_logs - (type, instance, position, owner, content) - @VALUES - RETURNING position;"; - - var valuesTemplate = @$" - ( - SELECT - ?, - ?, - COALESCE(MAX(position), -1) + @POSITION_OFFSET, - ?, - ? - FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? - )"; - - var command = new MySqlCommand(); - var positionOffsets = new Dictionary(); - var valueSqls = new List(); - foreach (var (storedId, owner, content) in entries) - { - if (!positionOffsets.ContainsKey(storedId)) - positionOffsets[storedId] = 1; - - var positionOffset = positionOffsets[storedId]++; - valueSqls.Add(valuesTemplate.Replace("@POSITION_OFFSET", positionOffset.ToString())); - - command.Parameters.Add(new MySqlParameter(name: null, storedId.Type.Value)); - command.Parameters.Add(new MySqlParameter(name: null, storedId.Instance.Value.ToString("N"))); - command.Parameters.Add(new MySqlParameter(name: null, owner.Value)); - command.Parameters.Add(new MySqlParameter(name: null, content)); - command.Parameters.Add(new MySqlParameter(name: null, storedId.Type.Value)); - command.Parameters.Add(new MySqlParameter(name: null, storedId.Instance.Value.ToString("N"))); - } - command.Connection = conn; - var sql = _appendsSql.Replace("@VALUES", string.Join(" UNION " + Environment.NewLine, valueSqls)); - command.CommandText = sql; - - var positions = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = reader.GetInt32(0); - positions.Add(new Position(position.ToString())); - } - - return positions; - } - - private string? _getEntriesSql; - public async Task> GetEntries(StoredId id) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _getEntriesSql ??= @$" - SELECT position, owner, content - FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? - ORDER BY position ASC;"; - await using var command = new MySqlCommand(_getEntriesSql, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value.ToString("N")} - } - }; - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffset; - public async Task> GetEntries(StoredId id, Position offset) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _getEntriesWithOffset ??= @$" - SELECT position, owner, content - FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? AND position > ? - ORDER BY position ASC;"; - await using var command = new MySqlCommand(_getEntriesWithOffset, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value.ToString("N")}, - new() {Value = int.Parse(offset.Value)} - } - }; - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffsetAndOwner; - public async Task GetEntries(StoredId id, Position offset, Owner owner) - { - await using var conn = await DatabaseHelper.CreateOpenConnection(_connectionString); - _getEntriesWithOffsetAndOwner ??= @$" - SELECT position, owner, CASE WHEN owner = ? THEN content END AS content - FROM {_tablePrefix}_logs - WHERE type = ? AND instance = ? AND position > ? - ORDER BY position ASC;"; - await using var command = new MySqlCommand(_getEntriesWithOffsetAndOwner, conn) - { - Parameters = - { - new() {Value = owner.Value}, - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value.ToString("N")}, - new() {Value = int.Parse(offset.Value)} - } - }; - - var entries = await ReadEntries(command); - if (entries.Count == 0) - return new MaxPositionAndEntries(offset, Entries: []); - - var maxPosition = entries[^1].Position; - return new MaxPositionAndEntries( - maxPosition, - entries.Where(e => e.Content != null!).ToList() - ); - } - - private async Task> ReadEntries(MySqlCommand command) - { - var entries = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = new Position(reader.GetInt32(0).ToString()); - var owner = new Owner(reader.GetInt32(1)); - var content = reader.IsDBNull(2) ? null : (byte[]) reader.GetValue(2); - entries.Add(new StoredLogEntry(owner, position, content!)); - } - - return entries; - } -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/LogStoreTests.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/LogStoreTests.cs deleted file mode 100644 index 07ba6fca..00000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL.Tests/LogStoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.PostgreSQL.Tests; - -[TestClass] -public class LogStoreTests : ResilientFunctions.Tests.TestTemplates.LogStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetTest() - => GetEntriesWithOffsetTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetAndOwnerTest() - => GetEntriesWithOffsetAndOwnerTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task AppendMultipleEntriesAtOnce() - => AppendMultipleEntriesAtOnce(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs index 363d0bb2..5497260d 100644 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs +++ b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgreSqlFunctionStore.cs @@ -32,8 +32,6 @@ public class PostgreSqlFunctionStore : IFunctionStore private readonly ICorrelationStore _correlationStore; public ICorrelationStore CorrelationStore => _correlationStore; - private readonly PostgresSqlLogStore _logStore; - public ILogStore LogStore => _logStore; private readonly PostgreSqlSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; @@ -52,7 +50,6 @@ public PostgreSqlFunctionStore(string connectionString, string tablePrefix = "") _effectsStore = new PostgreSqlEffectsStore(connectionString, _tableName); _timeoutStore = new PostgreSqlTimeoutStore(connectionString, _tableName); _correlationStore = new PostgreSqlCorrelationStore(connectionString, _tableName); - _logStore = new PostgresSqlLogStore(connectionString, _tableName); _semaphoreStore = new PostgreSqlSemaphoreStore(connectionString, _tableName); _typeStore = new PostgreSqlTypeStore(connectionString, _tableName); _postgresSqlUnderlyingRegister = new PostgresSqlUnderlyingRegister(connectionString, _tableName); @@ -79,7 +76,6 @@ public async Task Initialize() await _effectsStore.Initialize(); await _timeoutStore.Initialize(); await _correlationStore.Initialize(); - await _logStore.Initialize(); await _semaphoreStore.Initialize(); await _typeStore.Initialize(); await using var conn = await CreateConnection(); @@ -122,7 +118,6 @@ public async Task TruncateTables() await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); - await _logStore.Truncate(); await _semaphoreStore.Truncate(); await using var conn = await CreateConnection(); diff --git a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlLogStore.cs b/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlLogStore.cs deleted file mode 100644 index 22247f1c..00000000 --- a/Stores/PostgreSQL/Cleipnir.ResilientFunctions.PostgreSQL/PostgresSqlLogStore.cs +++ /dev/null @@ -1,273 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Storage; -using Npgsql; - -namespace Cleipnir.ResilientFunctions.PostgreSQL; - -public class PostgresSqlLogStore(string connectionString, string tablePrefix = "") : ILogStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - _initializeSql ??= $@" - CREATE TABLE IF NOT EXISTS {tablePrefix}_logs ( - type INT NOT NULL, - instance UUID NOT NULL, - position INT NOT NULL, - owner INT NOT NULL, - content BYTEA NOT NULL, - PRIMARY KEY (type, instance, position) - );"; - - await using var command = new NpgsqlCommand(_initializeSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _truncateTableSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateTableSql ??= $"TRUNCATE TABLE {tablePrefix}_logs;"; - var command = new NpgsqlCommand(_truncateTableSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private string? _updateLogSql; - public async Task Update(StoredId id, Position position, byte[] content, Owner owner) - { - await using var conn = await CreateConnection(); - - _updateLogSql ??= @$" - UPDATE {tablePrefix}_logs - SET content = $1 - WHERE type = $2 AND instance = $3 AND position = $4;"; - - await using var command = new NpgsqlCommand(_updateLogSql, conn) - { - Parameters = - { - new() {Value = content}, - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value}, - new() {Value = position.Value.ToInt()}, - } - }; - - await command.ExecuteNonQueryAsync(); - - return position; - } - - public async Task Delete(StoredId id, Position position) - { - await using var conn = await CreateConnection(); - - _updateLogSql ??= @$" - DELETE FROM {tablePrefix}_logs - WHERE type = $1 AND instance = $2 AND position = $3;"; - - await using var command = new NpgsqlCommand(_updateLogSql, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value}, - new() {Value = position.Value.ToInt()}, - } - }; - - await command.ExecuteNonQueryAsync(); - } - - private string? _appendSql; - public async Task Append(StoredId id, byte[] content, Owner owner) - { - await using var conn = await CreateConnection(); - _appendSql ??= @$" - INSERT INTO {tablePrefix}_logs - (type, instance, position, owner, content) - VALUES ( - $1, $2, - (SELECT COALESCE(MAX(position), -1) + 1 FROM {tablePrefix}_logs WHERE type = $1 AND instance = $2), - $3, $4 - ) RETURNING position;"; - var command = new NpgsqlCommand(_appendSql, conn) - { - Parameters = - { - new() { Value = id.Type.Value }, - new() { Value = id.Instance.Value }, - new() { Value = owner.Value }, - new() { Value = content }, - } - }; - - var position = await command.ExecuteScalarAsync(); - return new Position(position!.ToString()!); - } - - private string? _appendsSql; - public async Task> Append(IEnumerable entries) - { - await using var conn = await CreateConnection(); - - _appendsSql ??= @$" - INSERT INTO {tablePrefix}_logs - (type, instance, position, owner, content) - VALUES @VALUES - RETURNING position;"; - - var valuesTemplate = $@"( - $TYPE, - $INSTANCE, - (SELECT COALESCE(MAX(position), -1) + @POSITION_OFFSET FROM {tablePrefix}_logs WHERE type = $TYPE AND instance = $INSTANCE), - $OWNER, - $CONTENT - )"; - - var command = new NpgsqlCommand(); - var i = 1; - var valueSqls = new List(); - var positionOffsets = new Dictionary(); - foreach (var (storedId, owner, content) in entries) - { - if (!positionOffsets.ContainsKey(storedId)) - positionOffsets[storedId] = 1; - - var positionOffset = positionOffsets[storedId]++; - - var typeIndex = i++; - var instanceIndex = i++; - var ownerIndex = i++; - var contentIndex = i++; - - var replacedTemplate = valuesTemplate - .Replace("$TYPE", "$" + typeIndex) - .Replace("$INSTANCE", "$" + instanceIndex) - .Replace("$OWNER", "$" + ownerIndex) - .Replace("$CONTENT", "$" + contentIndex); - valueSqls.Add(replacedTemplate.Replace("@POSITION_OFFSET", positionOffset.ToString())); - - command.Parameters.AddWithValue(storedId.Type.Value); - command.Parameters.AddWithValue(storedId.Instance.Value); - command.Parameters.AddWithValue(owner.Value); - command.Parameters.AddWithValue(content); - } - command.Connection = conn; - var sql = _appendsSql.Replace("@VALUES", string.Join(", ", valueSqls)); - command.CommandText = sql; - - var positions = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = reader.GetInt32(0); - positions.Add(new Position(position.ToString())); - } - - return positions; - } - - private string? _getEntries; - public async Task> GetEntries(StoredId id) - { - await using var conn = await CreateConnection(); - _getEntries ??= @$" - SELECT position, owner, content - FROM {tablePrefix}_logs - WHERE type = $1 AND instance = $2 - ORDER BY position ASC;"; - - await using var command = new NpgsqlCommand(_getEntries, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value} - } - }; - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffset; - public async Task> GetEntries(StoredId id, Position offset) - { - await using var conn = await CreateConnection(); - _getEntriesWithOffset ??= @$" - SELECT position, owner, content - FROM {tablePrefix}_logs - WHERE type = $1 AND instance = $2 AND position > $3 - ORDER BY position ASC;"; - - await using var command = new NpgsqlCommand(_getEntriesWithOffset, conn) - { - Parameters = - { - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value}, - new() {Value = offset.Value.ToInt()}, - } - }; - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffsetAndOwner; - public async Task GetEntries(StoredId id, Position offset, Owner owner) - { - await using var conn = await CreateConnection(); - _getEntriesWithOffsetAndOwner ??= @$" - SELECT position, owner, CASE WHEN owner = $1 THEN content END AS content - FROM {tablePrefix}_logs - WHERE type = $2 AND instance = $3 AND position > $4 - ORDER BY position ASC;"; - - await using var command = new NpgsqlCommand(_getEntriesWithOffsetAndOwner, conn) - { - Parameters = - { - new() {Value = owner.Value}, - new() {Value = id.Type.Value}, - new() {Value = id.Instance.Value}, - new() {Value = offset.Value.ToInt()}, - } - }; - - var entries = await ReadEntries(command); - if (entries.Count == 0) - return new MaxPositionAndEntries(offset, Entries: []); - - var maxPosition = entries[^1].Position; - return new MaxPositionAndEntries( - maxPosition, - entries.Where(e => e.Content != null!).ToList() - ); - } - - private async Task> ReadEntries(NpgsqlCommand command) - { - var storedMessages = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = new Position(reader.GetInt32(0).ToString()); - var owner = new Owner(reader.GetInt32(1)); - var content = reader.IsDBNull(2) ? null : (byte[]) reader.GetValue(2); - storedMessages.Add(new StoredLogEntry(owner, position, content!)); - } - - return storedMessages; - } - - private async Task CreateConnection() - { - var conn = new NpgsqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/LogStoreTests.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/LogStoreTests.cs deleted file mode 100644 index 0cb36222..00000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer.Tests/LogStoreTests.cs +++ /dev/null @@ -1,24 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.VisualStudio.TestTools.UnitTesting; - -namespace Cleipnir.ResilientFunctions.SqlServer.Tests; - -[TestClass] -public class LogStoreTests : ResilientFunctions.Tests.TestTemplates.LogStoreTests -{ - [TestMethod] - public override Task SunshineScenarioTest() - => SunshineScenarioTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetTest() - => GetEntriesWithOffsetTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task GetEntriesWithOffsetAndOwnerTest() - => GetEntriesWithOffsetAndOwnerTest(FunctionStoreFactory.Create()); - - [TestMethod] - public override Task AppendMultipleEntriesAtOnce() - => AppendMultipleEntriesAtOnce(FunctionStoreFactory.Create()); -} \ No newline at end of file diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs index 361428a3..134c64f7 100644 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs +++ b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerFunctionStore.cs @@ -22,7 +22,6 @@ public class SqlServerFunctionStore : IFunctionStore private readonly SqlServerEffectsStore _effectsStore; private readonly SqlServerMessageStore _messageStore; private readonly SqlServerCorrelationsStore _correlationStore; - private readonly SqlServerLogStore _logStore; private readonly SqlServerTypeStore _typeStore; private readonly SqlServerMigrator _migrator; @@ -33,7 +32,6 @@ public class SqlServerFunctionStore : IFunctionStore public IMessageStore MessageStore => _messageStore; public Utilities Utilities { get; } public IMigrator Migrator => _migrator; - public ILogStore LogStore => _logStore; private readonly SqlServerSemaphoreStore _semaphoreStore; public ISemaphoreStore SemaphoreStore => _semaphoreStore; @@ -49,7 +47,6 @@ public SqlServerFunctionStore(string connectionString, string tablePrefix = "") _underlyingRegister = new SqlServerUnderlyingRegister(connectionString, _tableName); _effectsStore = new SqlServerEffectsStore(connectionString, _tableName); _correlationStore = new SqlServerCorrelationsStore(connectionString, _tableName); - _logStore = new SqlServerLogStore(connectionString, _tableName); _semaphoreStore = new SqlServerSemaphoreStore(connectionString, _tableName); _typeStore = new SqlServerTypeStore(connectionString, _tableName); _migrator = new SqlServerMigrator(connectionString, _tableName); @@ -79,7 +76,6 @@ public async Task Initialize() await _timeoutStore.Initialize(); await _correlationStore.Initialize(); await _typeStore.Initialize(); - await _logStore.Initialize(); await _semaphoreStore.Initialize(); await using var conn = await _connFunc(); _initializeSql ??= @$" @@ -126,7 +122,6 @@ public async Task TruncateTables() await _effectsStore.Truncate(); await _correlationStore.Truncate(); await _typeStore.Truncate(); - await _logStore.Truncate(); await _semaphoreStore.Truncate(); await using var conn = await _connFunc(); diff --git a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerLogStore.cs b/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerLogStore.cs deleted file mode 100644 index 911c6c07..00000000 --- a/Stores/SqlServer/Cleipnir.ResilientFunctions.SqlServer/SqlServerLogStore.cs +++ /dev/null @@ -1,248 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Domain.Exceptions; -using Cleipnir.ResilientFunctions.Messaging; -using Cleipnir.ResilientFunctions.Storage; -using Microsoft.Data.SqlClient; - -namespace Cleipnir.ResilientFunctions.SqlServer; - -public class SqlServerLogStore(string connectionString, string tablePrefix = "") : ILogStore -{ - private string? _initializeSql; - public async Task Initialize() - { - await using var conn = await CreateConnection(); - - _initializeSql ??= @$" - CREATE TABLE {tablePrefix}_Logs ( - Type INT, - Instance UNIQUEIDENTIFIER, - Position INT NOT NULL, - Owner INT NOT NULL, - Content VARBINARY(MAX) NOT NULL, - PRIMARY KEY (Type, Instance, Position) - );"; - var command = new SqlCommand(_initializeSql, conn); - try - { - await command.ExecuteNonQueryAsync(); - } catch (SqlException exception) when (exception.Number == 2714) {} - } - - private string? _truncateSql; - public async Task Truncate() - { - await using var conn = await CreateConnection(); - _truncateSql ??= $"TRUNCATE TABLE {tablePrefix}_Logs"; - await using var command = new SqlCommand(_truncateSql, conn); - await command.ExecuteNonQueryAsync(); - } - - private async Task CreateConnection() - { - var conn = new SqlConnection(connectionString); - await conn.OpenAsync(); - return conn; - } - - private string? _updateSql; - public async Task Update(StoredId id, Position position, byte[] content, Owner owner) - { - await using var conn = await CreateConnection(); - - _updateSql ??= @$" - UPDATE {tablePrefix}_Logs - SET Content = @Content - WHERE Type = @Type AND Instance = @Instance AND Position = @Position"; - - await using var command = new SqlCommand(_updateSql, conn); - command.Parameters.AddWithValue("@Content", content); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - command.Parameters.AddWithValue("@Position", int.Parse(position.Value)); - - await command.ExecuteNonQueryAsync(); - return position; - } - - private string? _deleteSql; - public async Task Delete(StoredId id, Position position) - { - await using var conn = await CreateConnection(); - - _deleteSql ??= @$" - DELETE FROM {tablePrefix}_Logs - WHERE Type = @Type AND Instance = @Instance AND Position = @Position"; - - await using var command = new SqlCommand(_deleteSql, conn); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - command.Parameters.AddWithValue("@Position", int.Parse(position.Value)); - - await command.ExecuteNonQueryAsync(); - } - - private string? _appendSql; - public async Task Append(StoredId id, byte[] content, Owner owner) - { - await using var conn = await CreateConnection(); - - _appendSql ??= @$" - INSERT INTO {tablePrefix}_Logs - (Type, Instance, Position, Owner, Content) - OUTPUT INSERTED.Position - VALUES ( - @Type, - @Instance, - (SELECT COALESCE(MAX(Position), -1) + 1 FROM {tablePrefix}_Logs WHERE Type = @Type AND Instance = @Instance), - @Owner, - @Content - );"; - - await using var command = new SqlCommand(_appendSql, conn); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - command.Parameters.AddWithValue("@Owner", owner.Value); - command.Parameters.AddWithValue("@Content", content); - - var position = await command.ExecuteScalarAsync(); - return new Position(position!.ToString()!); - } - - private string? _appendsSql; - public async Task> Append(IEnumerable entries) - { - await using var conn = await CreateConnection(); - - _appendsSql ??= @$" - INSERT INTO {tablePrefix}_Logs - (Type, Instance, Position, Owner, Content) - OUTPUT INSERTED.Position - VALUES @VALUES;"; - - var valuesTemplate = $@"( - @Type#, - @Instance#, - (SELECT COALESCE(MAX(Position), -1) + @POSITION_OFFSET FROM {tablePrefix}_Logs WHERE Type = @Type# AND Instance = @Instance#), - @Owner#, - @Content# - )"; - - await using var command = new SqlCommand(); - var i = 0; - var values = new List(); - var positionOffsets = new Dictionary(); - foreach (var (storedId, owner, content) in entries) - { - if (!positionOffsets.ContainsKey(storedId)) - positionOffsets[storedId] = 1; - - var positionOffset = positionOffsets[storedId]++; - - values.Add(valuesTemplate - .Replace("#", i.ToString()) - .Replace("@POSITION_OFFSET", positionOffset.ToString()) - ); - - command.Parameters.AddWithValue($"@Type{i}", storedId.Type.Value); - command.Parameters.AddWithValue($"@Instance{i}", storedId.Instance.Value); - command.Parameters.AddWithValue($"@Owner{i}", owner.Value); - command.Parameters.AddWithValue($"@Content{i}", content); - - i++; - } - command.Connection = conn; - command.CommandText = _appendsSql.Replace("@VALUES", string.Join(", ", values)); - - var positions = new List(capacity: i); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = new Position(reader.GetInt32(0).ToString()); - positions.Add(position); - } - - return positions; - } - - private string? _getEntries; - public async Task> GetEntries(StoredId id) - { - await using var conn = await CreateConnection(); - _getEntries ??= @$" - SELECT Position, Owner, Content - FROM {tablePrefix}_Logs - WHERE Type = @Type AND Instance = @Instance - ORDER BY Position ASC;"; - - await using var command = new SqlCommand(_getEntries, conn); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffset; - public async Task> GetEntries(StoredId id, Position offset) - { - await using var conn = await CreateConnection(); - _getEntriesWithOffset ??= @$" - SELECT Position, Owner, Content - FROM {tablePrefix}_Logs - WHERE Type = @Type AND Instance = @Instance AND Position > @Position - ORDER BY Position ASC;"; - - await using var command = new SqlCommand(_getEntriesWithOffset, conn); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - command.Parameters.AddWithValue("@Position", int.Parse(offset.Value)); - - return await ReadEntries(command); - } - - private string? _getEntriesWithOffsetAndOwner; - public async Task GetEntries(StoredId id, Position offset, Owner owner) - { - await using var conn = await CreateConnection(); - _getEntriesWithOffsetAndOwner ??= @$" - SELECT Position, Owner, CASE WHEN owner = @Owner THEN content END AS Content - FROM {tablePrefix}_Logs - WHERE Type = @Type AND Instance = @Instance AND Position > @Position - ORDER BY Position ASC;"; - - await using var command = new SqlCommand(_getEntriesWithOffsetAndOwner, conn); - command.Parameters.AddWithValue("@Type", id.Type.Value); - command.Parameters.AddWithValue("@Instance", id.Instance.Value); - command.Parameters.AddWithValue("@Position", int.Parse(offset.Value)); - command.Parameters.AddWithValue("@Owner", owner.Value); - - var entries = await ReadEntries(command); - if (entries.Count == 0) - return new MaxPositionAndEntries(offset, Entries: []); - - var maxPosition = entries[^1].Position; - return new MaxPositionAndEntries( - maxPosition, - entries.Where(e => e.Content != null!).ToList() - ); - } - - private async Task> ReadEntries(SqlCommand command) - { - var logEntries = new List(); - await using var reader = await command.ExecuteReaderAsync(); - while (await reader.ReadAsync()) - { - var position = new Position(reader.GetInt32(0).ToString()); - var owner = new Owner(reader.GetInt32(1)); - var content = reader.IsDBNull(2) ? null : (byte[]) reader.GetValue(2); - logEntries.Add(new StoredLogEntry(owner, position, content!)); - } - - return logEntries; - } -} \ No newline at end of file