Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to MySQL for distributed lock #3396

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Brighter.sln
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Salutation_Sweeper", "sampl
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.MsSql", "src\Paramore.Brighter.Locking.MsSql\Paramore.Brighter.Locking.MsSql.csproj", "{758EE237-C722-4A0A-908C-2D08C1E59025}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.MySql", "src\Paramore.Brighter.Locking.MySql\Paramore.Brighter.Locking.MySql.csproj", "{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -1765,6 +1767,18 @@ Global
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|x86.ActiveCfg = Release|Any CPU
{758EE237-C722-4A0A-908C-2D08C1E59025}.Release|x86.Build.0 = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|x86.ActiveCfg = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Debug|x86.Build.0 = Debug|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Any CPU.Build.0 = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.ActiveCfg = Release|Any CPU
{5D57A811-A3C2-42BA-A086-A5BFE8B590C7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
176 changes: 176 additions & 0 deletions src/Paramore.Brighter.Locking.MySql/MySqlLockingProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
using System;
using System.Collections.Concurrent;
using System.Data.Common;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using MySqlConnector;
using Paramore.Brighter.Logging;
using Paramore.Brighter.MySql;

namespace Paramore.Brighter.Locking.MySql;

/// <summary>
/// The MySQL Locking Provider
/// </summary>
/// <param name="connectionProvider">The MySQL connection Provider.</param>
public class MySqlLockingProvider(MySqlConnectionProvider connectionProvider) : IDistributedLock, IAsyncDisposable
{
private readonly ILogger _logger = ApplicationLogging.CreateLogger<MySqlConnectionProvider>();
private readonly ConcurrentDictionary<string, DbConnection> _connections = new();

/// <summary>
/// Attempt to obtain a lock on a resource
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <param name="cancellationToken">The Cancellation Token</param>
/// <returns>The id of the lock that has been acquired or null if no lock was able to be acquired</returns>
public async Task<string> ObtainLockAsync(string resource, CancellationToken cancellationToken)
{
if (!_connections.ContainsKey(resource))
{
return null;
}

var connection = await connectionProvider.GetConnectionAsync(cancellationToken);
await using var command = connection.CreateCommand();
command.CommandText = MySqlLockingQueries.ObtainLockQuery;
command.Parameters.Add(new MySqlParameter("@RESOURCE_NAME", MySqlDbType.String)
{
Value = GetSafeName(resource)
});

command.Parameters.Add(new MySqlParameter("@TIMEOUT", MySqlDbType.UInt32)
{
// Infinity timeout
// MariaDB doesn't support -1 value, see: https://stackoverflow.com/questions/49792089/set-infinite-timeout-get-lock-in-mariadb/49809919
Value = 0xffffffff
});

var result = await command.ExecuteScalarAsync(cancellationToken) ?? -1;
if (Convert.ToInt64(result) != 1)
{
return null;
}

_connections.TryAdd(resource, connection);
return resource;
}


/// <summary>
/// Release a lock
/// </summary>
/// <param name="resource">The name of the resource to Lock</param>
/// <param name="lockId">The lock Id that was provided when the lock was obtained</param>
/// <param name="cancellationToken"></param>
/// <returns>Awaitable Task</returns>
public async Task ReleaseLockAsync(string resource, string lockId, CancellationToken cancellationToken)
{
if (!_connections.TryRemove(resource, out var connection))
{
return;
}

await using var command = connection.CreateCommand();
command.CommandText = MySqlLockingQueries.ReleaseLockQuery;
command.Parameters.Add(
new MySqlParameter("@RESOURCE_NAME", MySqlDbType.String) { Value = GetSafeName(resource) });

await command.ExecuteNonQueryAsync(cancellationToken);

await connection.CloseAsync();
await connection.DisposeAsync();
}

/// <summary>
/// Dispose Locking Provider
/// </summary>
public async ValueTask DisposeAsync()
{
foreach ((_, DbConnection connection) in _connections)
{
await connection.DisposeAsync();
}
}

// Copied from https://github.com/madelson/DistributedLock/blob/2.5.0/src/DistributedLock.MySql/MySqlDistributedLock.cs#L82-L147
// That repo is using MIT license.
/// <summary>
/// From https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html
/// </summary>
private const int MaxNameLength = 64;

private static string GetSafeName(string name) =>
ToSafeName(
name,
MaxNameLength,
convertToValidName: s =>
{
if (s.Length == 0) { return "__empty__"; }

return s.ToLowerInvariant();
},
hash: ComputeHash
);

private static string ToSafeName(string name, int maxNameLength, Func<string, string> convertToValidName,
Func<byte[], string> hash)
{
ArgumentNullException.ThrowIfNull(name);

var validBaseLockName = convertToValidName(name);
if (validBaseLockName == name && validBaseLockName.Length <= maxNameLength)
{
return name;
}

var nameHash = hash(Encoding.UTF8.GetBytes(name));

if (nameHash.Length >= maxNameLength)
{
return nameHash.Substring(0, length: maxNameLength);
}

var prefix =
validBaseLockName.Substring(0, Math.Min(validBaseLockName.Length, maxNameLength - nameHash.Length));
return prefix + nameHash;
}

private static string ComputeHash(byte[] bytes)
{
using var sha = SHA512.Create();
var hashBytes = sha.ComputeHash(bytes);

// We truncate to 160 bits, which is 32 chars of Base32. This should still give us good collision resistance but allows for a 64-char
// name to include a good portion of the original provided name, which is good for debugging. See
// https://crypto.stackexchange.com/questions/9435/is-truncating-a-sha512-hash-to-the-first-160-bits-as-secure-as-using-sha1#:~:text=Yes.,time%20is%20still%20pretty%20big
const int Base32CharBits = 5;
const int HashLengthInChars = 160 / Base32CharBits;

// we use Base32 because it is case-insensitive (like MySQL) and a bit more compact than Base16
// RFC 4648 from https://en.wikipedia.org/wiki/Base32
const string Base32Alphabet = "abcdefghijklmnopqrstuvwxyz234567";

var chars = new char[HashLengthInChars];
var byteIndex = 0;
var bitBuffer = 0;
var bitsRemaining = 0;
for (var charIndex = 0; charIndex < chars.Length; ++charIndex)
{
if (bitsRemaining < Base32CharBits)
{
bitBuffer |= hashBytes[byteIndex++] << bitsRemaining;
bitsRemaining += 8;
}

chars[charIndex] = Base32Alphabet[bitBuffer & 31];
bitBuffer >>= Base32CharBits;
bitsRemaining -= Base32CharBits;
}

return new string(chars);
}
}
8 changes: 8 additions & 0 deletions src/Paramore.Brighter.Locking.MySql/MySqlLockingQueries.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Paramore.Brighter.Locking.MySql;

public static class MySqlLockingQueries
{
public const string ObtainLockQuery = "SELECT GET_LOCK(@RESOURCE_NAME)";

public const string ReleaseLockQuery = "SELECT RELEASE_LOCK(@RESOURCE_NAME, @TIMEOUT)";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Description>This is an implementation of the locking using MySQL.</Description>
<Authors>Rafael Lillo</Authors>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
<PackageTags>MySql;Brighter;</PackageTags>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Paramore.Brighter.MySql\Paramore.Brighter.MySql.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="MySqlConnector"/>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using MySqlConnector;
using Paramore.Brighter.Locking.MySql;
using Paramore.Brighter.MySql;
using Xunit;

namespace Paramore.Brighter.MySQL.Tests.LockingProvider;

[Trait("Category", "MySql")]
public class MsSqlLockingProviderTests
{
private readonly MySqlTestHelper _msSqlTestHelper;

public MsSqlLockingProviderTests()
{
_msSqlTestHelper = new MySqlTestHelper();
_msSqlTestHelper.SetupMessageDb();
}


[Fact]
public async Task GivenAMsSqlLockingProvider_WhenLockIsCalled_LockCanBeObtainedAndThenReleased()
{
var provider = new MySqlLockingProvider((MySqlConnectionProvider)_msSqlTestHelper.ConnectionProvider);
var resource = "Sweeper";

var result = await provider.ObtainLockAsync(resource, CancellationToken.None);

Assert.NotEmpty(result);
Assert.Equal(resource, result);

await provider.ReleaseLockAsync(resource, result, CancellationToken.None);
}

[Fact]
public async Task GivenTwoLockingProviders_WhenLockIsCalledOnBoth_OneFailsUntilTheFirstLockIsReleased()
{
var provider1 = new MySqlLockingProvider((MySqlConnectionProvider)_msSqlTestHelper.ConnectionProvider);
var provider2 = new MySqlLockingProvider((MySqlConnectionProvider)_msSqlTestHelper.ConnectionProvider);
const string resource = "Sweeper";

var firstLock = await provider1.ObtainLockAsync(resource, CancellationToken.None);
var secondLock = await provider2.ObtainLockAsync(resource, CancellationToken.None);

Assert.NotEmpty(firstLock);
Assert.Null(secondLock);

await provider1.ReleaseLockAsync(resource, firstLock, CancellationToken.None);
var secondLockAttemptTwo = await provider2.ObtainLockAsync(resource, CancellationToken.None);

Assert.NotEmpty(secondLockAttemptTwo);
}

[Fact]
public async Task GivenAnExistingLock_WhenConnectionDies_LockIsReleased()
{
var resource = Guid.NewGuid().ToString();
var connection = await ObtainLockForManualDisposal(resource);

var provider1 = new MySqlLockingProvider((MySqlConnectionProvider)_msSqlTestHelper.ConnectionProvider);

var lockAttempt = await provider1.ObtainLockAsync(resource, CancellationToken.None);

// Ensure Lock was not obtained
Assert.Null(lockAttempt);

await connection.DisposeAsync();

var lockAttemptTwo = await provider1.ObtainLockAsync(resource, CancellationToken.None);

// Ensure Lock was Obtained
Assert.False(string.IsNullOrEmpty(lockAttemptTwo));
}

private async Task<DbConnection> ObtainLockForManualDisposal(string resource)
{
var connectionProvider = (MySqlConnectionProvider)_msSqlTestHelper.ConnectionProvider;
var connection = await connectionProvider.GetConnectionAsync(CancellationToken.None);
var command = connection.CreateCommand();
command.CommandText = MySqlLockingQueries.ObtainLockQuery;
command.Parameters.Add(new MySqlParameter("@RESOURCE_NAME", MySqlDbType.String));
command.Parameters["@RESOURCE_NAME"].Value = resource;
command.Parameters.Add(new MySqlParameter("@TIMEOUT", MySqlDbType.UInt32));
command.Parameters["@TIMEOUT"].Value = -1;

var respone = await command.ExecuteScalarAsync(CancellationToken.None);

//Assert Lock was successful
int.TryParse(respone.ToString(), out var responseCode);
Assert.True(responseCode == 1);

return connection;
}
}
30 changes: 19 additions & 11 deletions tests/Paramore.Brighter.MySQL.Tests/MySqlTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ public class MySqlTestHelper
private readonly bool _binaryMessagePayload;
private string _tableName;
private MySqlSettings _mysqlSettings;


private IAmARelationalDbConnectionProvider _connectionProvider;
public IAmARelationalDbConnectionProvider ConnectionProvider => _connectionProvider;

public RelationalDatabaseConfiguration InboxConfiguration =>
new(_mysqlSettings.TestsBrighterConnectionString, inboxTableName: _tableName);

public RelationalDatabaseConfiguration OutboxConfiguration =>
new(_mysqlSettings.TestsBrighterConnectionString, outBoxTableName: _tableName, binaryMessagePayload: _binaryMessagePayload);
public RelationalDatabaseConfiguration OutboxConfiguration =>
new(_mysqlSettings.TestsBrighterConnectionString, outBoxTableName: _tableName,
binaryMessagePayload: _binaryMessagePayload);

public MySqlTestHelper(bool binaryMessagePayload = false)
{
Expand All @@ -29,6 +33,10 @@ public MySqlTestHelper(bool binaryMessagePayload = false)
configuration.GetSection("MySql").Bind(_mysqlSettings);

_tableName = $"test_{Guid.NewGuid()}";

_connectionProvider =
new MySqlConnectionProvider(
new RelationalDatabaseConfiguration(_mysqlSettings.TestsBrighterConnectionString));
}

public void CreateDatabase()
Expand All @@ -52,14 +60,14 @@ public void SetupCommandDb()
CreateInboxTable();
}

public void CleanUpDb()
{
using var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString);
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}";
command.ExecuteNonQuery();
}
public void CleanUpDb()
{
using var connection = new MySqlConnection(_mysqlSettings.TestsBrighterConnectionString);
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = $@"DROP TABLE IF EXISTS {_tableName}";
command.ExecuteNonQuery();
}

public void CreateOutboxTable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\Paramore.Brighter.Inbox.MySql\Paramore.Brighter.Inbox.MySql.csproj" />
<ProjectReference Include="..\..\src\Paramore.Brighter.Locking.MySql\Paramore.Brighter.Locking.MySql.csproj" />
<ProjectReference Include="..\..\src\Paramore.Brighter.Outbox.MySql\Paramore.Brighter.Outbox.MySql.csproj" />
</ItemGroup>

Expand Down
Loading