Skip to content

Commit

Permalink
[Host.Outbox] Execute permission is required for Outbox #297
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Sep 4, 2024
1 parent b311e52 commit 9d95596
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 41 deletions.
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>2.5.1</Version>
<Version>2.5.2-rc1</Version>
</PropertyGroup>

</Project>
16 changes: 14 additions & 2 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxMigrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,21 @@ await TryApplyMigration("20240503000000_SMB_Optimizations",
-- SqlOutboxTemplate.SqlOutboxMessageDeleteSent
CREATE INDEX IX_Outbox_Timestamp_DeliveryComplete_1_DeliveryAborted_0 ON {qualifiedTableName} (Timestamp) WHERE (DeliveryComplete = 1 and DeliveryAborted = 0);
BEGIN TRY
-- SqlOutboxTemplate.SqlOutboxMessageUpdateSent
CREATE TYPE {qualifiedOutboxIdTypeName} AS TABLE (Id uniqueidentifier);
END TRY
BEGIN CATCH
-- Ignore when there is lack of permissions to create a custom type.
-- In the next migration we will drop the type see: https://github.com/zarusz/SlimMessageBus/issues/297
END CATCH;
""",
token);

-- SqlOutboxTemplate.SqlOutboxMessageUpdateSent
CREATE TYPE {qualifiedOutboxIdTypeName} AS TABLE (Id uniqueidentifier);
await TryApplyMigration("20240831000000_SMB_RemoveOutboxIdType",
$"""
DROP TYPE IF EXISTS {qualifiedOutboxIdTypeName};
""",
token);
}
Expand Down
38 changes: 5 additions & 33 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public async virtual Task Save(OutboxMessage message, CancellationToken token)
{
await EnsureConnection();

// ToDo: Create command template

await ExecuteNonQuery(Settings.SqlSettings.OperationRetry, _sqlTemplate.SqlOutboxMessageInsert, cmd =>
{
cmd.Parameters.Add("@Id", SqlDbType.UniqueIdentifier).Value = message.Id;
Expand Down Expand Up @@ -63,20 +61,11 @@ public async Task AbortDelivery(IReadOnlyCollection<Guid> ids, CancellationToken

await EnsureConnection();

var table = new DataTable();
table.Columns.Add("Id", typeof(Guid));
foreach (var guid in ids)
{
table.Rows.Add(guid);
}

var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageAbortDelivery,
cmd =>
{
var param = cmd.Parameters.Add("@Ids", SqlDbType.Structured);
param.TypeName = _sqlTemplate.OutboxIdTypeQualified;
param.Value = table;
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);

Expand All @@ -95,20 +84,11 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken

await EnsureConnection();

var table = new DataTable();
table.Columns.Add("Id", typeof(Guid));
foreach (var guid in ids)
{
table.Rows.Add(guid);
}

var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageUpdateSent,
cmd =>
{
var param = cmd.Parameters.Add("@Ids", SqlDbType.Structured);
param.TypeName = _sqlTemplate.OutboxIdTypeQualified;
param.Value = table;
cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
},
token: token);

Expand All @@ -118,6 +98,8 @@ public async Task UpdateToSent(IReadOnlyCollection<Guid> ids, CancellationToken
}
}

private string ToIdsString(IReadOnlyCollection<Guid> ids) => string.Join(_sqlTemplate.InIdsSeparator, ids);

public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int maxDeliveryAttempts, CancellationToken token)
{
if (ids.Count == 0)
Expand All @@ -132,21 +114,11 @@ public async Task IncrementDeliveryAttempt(IReadOnlyCollection<Guid> ids, int ma

await EnsureConnection();

var table = new DataTable();
table.Columns.Add("Id", typeof(Guid));
foreach (var guid in ids)
{
table.Rows.Add(guid);
}

var affected = await ExecuteNonQuery(Settings.SqlSettings.OperationRetry,
_sqlTemplate.SqlOutboxMessageIncrementDeliveryAttempt,
cmd =>
{
var param = cmd.Parameters.Add("@Ids", SqlDbType.Structured);
param.TypeName = _sqlTemplate.OutboxIdTypeQualified;
param.Value = table;

cmd.Parameters.AddWithValue("@Ids", ToIdsString(ids));
cmd.Parameters.AddWithValue("@MaxDeliveryAttempts", maxDeliveryAttempts);
},
token: token);
Expand Down
14 changes: 9 additions & 5 deletions src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxTemplate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

public class SqlOutboxTemplate
{
public string OutboxIdTypeQualified { get; }
public string TableNameQualified { get; }
public string MigrationsTableNameQualified { get; }
public string SqlOutboxMessageInsert { get; }
Expand All @@ -19,9 +18,10 @@ public class SqlOutboxTemplate
/// </summary>
internal string SqlOutboxAllMessages { get; }

public string InIdsSeparator { get; } = "|";

public SqlOutboxTemplate(SqlOutboxSettings settings)
{
OutboxIdTypeQualified = $"[{settings.SqlSettings.DatabaseSchemaName}].[{settings.SqlSettings.DatabaseOutboxTypeName}]";
TableNameQualified = $"[{settings.SqlSettings.DatabaseSchemaName}].[{settings.SqlSettings.DatabaseTableName}]";
MigrationsTableNameQualified = $"[{settings.SqlSettings.DatabaseSchemaName}].[{settings.SqlSettings.DatabaseMigrationsTableName}]";

Expand Down Expand Up @@ -106,25 +106,29 @@ SELECT TOP (@BatchSize) Id
ORDER BY Timestamp ASC;
""";

// See https://learn.microsoft.com/en-us/sql/t-sql/functions/string-split-transact-sql?view=sql-server-ver16
// See https://stackoverflow.com/a/47777878/1906057
var inIdsSql = $"SELECT CONVERT(uniqueidentifier, [value]) from STRING_SPLIT(@Ids, '{InIdsSeparator}')";

SqlOutboxMessageUpdateSent = $"""
UPDATE {TableNameQualified}
SET [DeliveryComplete] = 1,
[DeliveryAttempt] = DeliveryAttempt + 1
WHERE [Id] IN (SELECT [Id] from @Ids);
WHERE [Id] IN ({inIdsSql});
""";

SqlOutboxMessageIncrementDeliveryAttempt = $"""
UPDATE {TableNameQualified}
SET [DeliveryAttempt] = DeliveryAttempt + 1,
[DeliveryAborted] = CASE WHEN [DeliveryAttempt] >= @MaxDeliveryAttempts THEN 1 ELSE 0 END
WHERE [Id] IN (SELECT [Id] from @Ids);
WHERE [Id] IN ({inIdsSql});
""";

SqlOutboxMessageAbortDelivery = $"""
UPDATE {TableNameQualified}
SET [DeliveryAttempt] = DeliveryAttempt + 1,
[DeliveryAborted] = 1
WHERE [Id] IN (SELECT [Id] from @Ids);
WHERE [Id] IN ({inIdsSql});
""";

SqlOutboxMessageRenewLock = $"""
Expand Down

0 comments on commit 9d95596

Please sign in to comment.