Skip to content

Commit

Permalink
Merge pull request #72 from raisedapp/develop
Browse files Browse the repository at this point in the history
V0.4.2
  • Loading branch information
felixclase authored Apr 10, 2024
2 parents 251983b + 1850239 commit a072349
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 136 deletions.
6 changes: 2 additions & 4 deletions src/main/Hangfire.Storage.SQLite/ExpirationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ private int RemoveExpireRows(HangfireDbContext db, string table)

try
{
var _lock = new SQLiteDistributedLock(DistributedLockKey, DefaultLockTimeout,
db, db.StorageOptions);

using (_lock)
using (SQLiteDistributedLock.Acquire(DistributedLockKey, DefaultLockTimeout,
db, db.StorageOptions))
{
rowsAffected = db.Database.Execute(deleteScript);
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/Hangfire.Storage.SQLite/Hangfire.Storage.SQLite.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TargetFrameworks>netstandard2.0;net48</TargetFrameworks>
</PropertyGroup>
<PropertyGroup>
<Version>0.4.1</Version>
<Version>0.4.2</Version>
<Authors>RaisedApp</Authors>
<Company>RaisedApp</Company>
<Copyright>Copyright © 2019 - Present</Copyright>
Expand All @@ -20,8 +20,12 @@
<title>Hangfire Storage SQLite</title>
<Description>An Alternative SQLite Storage for Hangfire</Description>
<PackageReleaseNotes>
0.4.1
- Stability and retry enhancements introduced by: Daniel Lindblom
0.4.2
-remove re-entrancy (fixes SQLiteDistributedLock doesn't play right with async #68). Thanks to @kirides
-pause heartbeat timer while processing. Thanks to @kirides
-update expiration using SQL Update statement in a single step. Thanks to @kirides
-Added Heartbeat event (for testing). Thanks to @kirides
-if we no longer own the lock, we immediately dispose the heartbeat timer (fixes Unable to update heartbeat - still happening in .NET 6.0 #69). Thanks to @kirides
</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public override void Dispose()
public override IDisposable AcquireDistributedLock(string resource, TimeSpan timeout)
{
return Retry.Twice((_) =>
new SQLiteDistributedLock($"HangFire:{resource}", timeout, DbContext, _storageOptions)
SQLiteDistributedLock.Acquire($"HangFire:{resource}", timeout, DbContext, _storageOptions)
);
}

Expand Down
188 changes: 85 additions & 103 deletions src/main/Hangfire.Storage.SQLite/SQLiteDistributedLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using Hangfire.Storage.SQLite.Entities;
using SQLite;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Hangfire.Storage.SQLite
Expand All @@ -14,9 +14,6 @@ public class SQLiteDistributedLock : IDisposable
{
private static readonly ILog Logger = LogProvider.For<SQLiteDistributedLock>();

private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks
= new ThreadLocal<Dictionary<string, int>>(() => new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase));

private readonly string _resource;
private readonly string _resourceKey;

Expand All @@ -30,15 +27,17 @@ private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks

private string EventWaitHandleName => string.Intern($@"{GetType().FullName}.{_resource}");

public event Action<bool> Heartbeat;

/// <summary>
/// Creates SQLite distributed lock
/// </summary>
/// <param name="resource">Lock resource</param>
/// <param name="timeout">Lock timeout</param>
/// <param name="database">Lock database</param>
/// <param name="storageOptions">Database options</param>
/// <exception cref="DistributedLockTimeoutException">Thrown if lock is not acuired within the timeout</exception>
public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContext database,
private SQLiteDistributedLock(string resource,
HangfireDbContext database,
SQLiteStorageOptions storageOptions)
{
_resource = resource ?? throw new ArgumentNullException(nameof(resource));
Expand All @@ -50,22 +49,25 @@ public SQLiteDistributedLock(string resource, TimeSpan timeout, HangfireDbContex
{
throw new ArgumentException($@"The {nameof(resource)} cannot be empty", nameof(resource));
}
}

public static SQLiteDistributedLock Acquire(
string resource,
TimeSpan timeout,
HangfireDbContext database,
SQLiteStorageOptions storageOptions)
{
if (timeout.TotalSeconds > int.MaxValue)
{
throw new ArgumentException($"The timeout specified is too large. Please supply a timeout equal to or less than {int.MaxValue} seconds", nameof(timeout));
}

if (!AcquiredLocks.Value.ContainsKey(_resource) || AcquiredLocks.Value[_resource] == 0)
{
Cleanup();
Acquire(timeout);
AcquiredLocks.Value[_resource] = 1;
StartHeartBeat();
}
else
{
AcquiredLocks.Value[_resource]++;
}
var slock = new SQLiteDistributedLock(resource, database, storageOptions);

slock.Acquire(timeout);
slock.StartHeartBeat();

return slock;
}

/// <summary>
Expand All @@ -78,96 +80,52 @@ public void Dispose()
{
return;
}

_completed = true;
_heartbeatTimer?.Dispose();
Release();
}

if (!AcquiredLocks.Value.ContainsKey(_resource))
private bool TryAcquireLock()
{
Cleanup();
try
{
return;
}

AcquiredLocks.Value[_resource]--;
var distributedLock = new DistributedLock
{
Id = Guid.NewGuid().ToString(),
Resource = _resource,
ResourceKey = _resourceKey,
ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime)
};

if (AcquiredLocks.Value[_resource] > 0)
{
return;
return _dbContext.Database.Insert(distributedLock) == 1;
}

// Timer callback may be invoked after the Dispose method call,
// but since we use the resource key, we will not disturb other owners.
AcquiredLocks.Value.Remove(_resource);

if (_heartbeatTimer != null)
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
return false;
}

Release();

Cleanup();
}

private void Acquire(TimeSpan timeout)
{
try
var sw = Stopwatch.StartNew();
do
{
var isLockAcquired = false;
var now = DateTime.UtcNow;
var lockTimeoutTime = now.Add(timeout);

while (lockTimeoutTime >= now)
if (TryAcquireLock())
{
Cleanup();

lock (EventWaitHandleName)
{
var result = _dbContext.DistributedLockRepository.FirstOrDefault(_ => _.Resource == _resource);

if (result == null)
{
try
{
var distributedLock = new DistributedLock();
distributedLock.Id = Guid.NewGuid().ToString();
distributedLock.Resource = _resource;
distributedLock.ResourceKey = _resourceKey;
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);

_dbContext.Database.Insert(distributedLock);

// we were able to acquire the lock - break the loop
isLockAcquired = true;
break;
}
catch (SQLiteException e) when (e.Result == SQLite3.Result.Constraint)
{
// The lock already exists preventing us from inserting.
continue;
}
}
}

// we couldn't acquire the lock - wait a bit and try again
var waitTime = (int)timeout.TotalMilliseconds / 10;
lock (EventWaitHandleName)
Monitor.Wait(EventWaitHandleName, waitTime);

now = DateTime.UtcNow;
return;
}

if (!isLockAcquired)
var waitTime = (int) timeout.TotalMilliseconds / 10;
// either wait for the event to be raised, or timeout
lock (EventWaitHandleName)
{
throw new DistributedLockTimeoutException(_resource);
Monitor.Wait(EventWaitHandleName, waitTime);
}
}
catch (DistributedLockTimeoutException ex)
{
throw ex;
}
catch (Exception ex)
{
throw ex;
}
} while (sw.Elapsed <= timeout);

throw new DistributedLockTimeoutException(_resource);
}

/// <summary>
Expand All @@ -179,9 +137,12 @@ private void Release()
Retry.Twice((retry) => {
// Remove resource lock (if it's still ours)
_dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
lock (EventWaitHandleName)
Monitor.Pulse(EventWaitHandleName);
var count = _dbContext.DistributedLockRepository.Delete(_ => _.Resource == _resource && _.ResourceKey == _resourceKey);
if (count != 0)
{
lock (EventWaitHandleName)
Monitor.Pulse(EventWaitHandleName);
}
});
}

Expand All @@ -192,7 +153,7 @@ private void Cleanup()
Retry.Twice((_) => {
// Delete expired locks (of any owner)
_dbContext.DistributedLockRepository.
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
Delete(x => x.Resource == _resource && x.ExpireAt < DateTime.UtcNow);
});
}
catch (Exception ex)
Expand All @@ -210,27 +171,48 @@ private void StartHeartBeat()

_heartbeatTimer = new Timer(state =>
{
// stop timer
_heartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite);
// Timer callback may be invoked after the Dispose method call,
// but since we use the resource key, we will not disturb other owners.
try
{
var distributedLock = _dbContext.DistributedLockRepository.FirstOrDefault(x => x.Resource == _resource && x.ResourceKey == _resourceKey);
if (distributedLock != null)
{
distributedLock.ExpireAt = DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime);
_dbContext.Database.Update(distributedLock);
}
else
var didUpdate = UpdateExpiration(_dbContext.DistributedLockRepository, DateTime.UtcNow.Add(_storageOptions.DistributedLockLifetime));
Heartbeat?.Invoke(didUpdate);
if (!didUpdate)
{
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. The resource is not locked or is locked by another owner.", _resource);
// if we no longer have a lock, stop the heartbeat immediately
_heartbeatTimer?.Dispose();
return;
}
}
catch (Exception ex)
{
Logger.ErrorFormat("Unable to update heartbeat on the resource '{0}'. {1}", _resource, ex);
}
// restart timer
_heartbeatTimer?.Change(timerInterval, timerInterval);
}, null, timerInterval, timerInterval);
}

private bool UpdateExpiration(TableQuery<DistributedLock> tableQuery, DateTime expireAt)
{
var expireColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ExpireAt)).Name;
var resourceColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.Resource)).Name;
var resourceKeyColumn = tableQuery.Table.FindColumnWithPropertyName(nameof(DistributedLock.ResourceKey)).Name;
var table = tableQuery.Table.TableName;

var command = tableQuery.Connection.CreateCommand($@"UPDATE ""{table}""
SET ""{expireColumn}"" = ?
WHERE ""{resourceColumn}"" = ?
AND ""{resourceKeyColumn}"" = ?",
expireAt,
_resource,
_resourceKey);

return command.ExecuteNonQuery() != 0;
}
}
}
}
27 changes: 27 additions & 0 deletions src/main/Hangfire.Storage.SQLite/SQLiteStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ public class SQLiteStorage : JobStorage, IDisposable
private readonly SQLiteDbConnectionFactory _dbConnectionFactory;

private readonly SQLiteStorageOptions _storageOptions;

private readonly Dictionary<string, bool> _features = new Dictionary<string, bool>(StringComparer.OrdinalIgnoreCase)
{
{ "Storage.ExtendedApi", false },
{ "Job.Queue", true },
{ "Connection.GetUtcDateTime", false },
{ "Connection.BatchedGetFirstByLowestScoreFromSet", false },
{ "Connection.GetSetContains", true },
{ "Connection.GetSetCount.Limited", false },
{ "BatchedGetFirstByLowestScoreFromSet", false },
{ "Transaction.AcquireDistributedLock", true },
{ "Transaction.CreateJob", true },
{ "Transaction.SetJobParameter", true },
{ "TransactionalAcknowledge:InMemoryFetchedJob", false },
{ "Monitoring.DeletedStateGraphs", false },
{ "Monitoring.AwaitingJobs", false }
};

private ConcurrentQueue<PooledHangfireDbContext> _dbContextPool = new ConcurrentQueue<PooledHangfireDbContext>();

/// <summary>
Expand Down Expand Up @@ -113,6 +131,15 @@ private void EnqueueOrPhaseOut(PooledHangfireDbContext dbContext)
}
}

public override bool HasFeature(string featureId)
{
if (featureId == null) throw new ArgumentNullException(nameof(featureId));

return _features.TryGetValue(featureId, out var isSupported)
? isSupported
: base.HasFeature(featureId);
}

/// <summary>
/// Returns text representation of the object
/// </summary>
Expand Down
13 changes: 12 additions & 1 deletion src/samples/WebSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
.UseSQLiteStorage("Hangfire.db")
.UseHeartbeatPage(checkInterval: TimeSpan.FromSeconds(10))
.UseJobsLogger());
services.AddHangfireServer();
services.AddHangfireServer(options =>
{
options.Queues = new[] { "test_queue_1", "default" };
});

var app = builder.Build();

Expand All @@ -27,4 +30,12 @@
RecurringJob.AddOrUpdate("TaskMethod()", (TaskSample t) => t.TaskMethod(), Cron.Minutely);
RecurringJob.AddOrUpdate("TaskMethod2()", (TaskSample t) => t.TaskMethod2(null), Cron.Minutely);

var t = app.Services.GetService<IBackgroundJobClient>();
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));
t.Enqueue(queue: "test_queue_1", methodCall: () => Console.WriteLine("Testing......"));

app.Run();
Loading

0 comments on commit a072349

Please sign in to comment.