Skip to content

Commit

Permalink
Fix RecurringJob schedule inconsistencies (#91)
Browse files Browse the repository at this point in the history
* added more jobs to the workerService example

* formatting

* BackgroundJobRegistrations now state whether they're RecurringJobs or not

* added method for retrieving recurring jobs from the scheduler

* added logic for scheduling and running recurring jobs by using a timer

* removed unnecessary namespace

* added tests to cover new functionality

* Added two new recurring job types: IRecurringJobWithInitialDelay and IRecurringJobWithNoInitialDelay

* added test to verify amount of cronjob occurrences

* update approved public api doc

* added registration methods for IRecurringJobWithInitialDelay

* Added DelegateRecurringJobWithInitialDelay

* updated test to work with new scheduler

* removed IRecurringJobWithNoInitialDelay

* Public signature update

* added more to backgroundservice test

* remove unneeded line

---------

Co-authored-by: Niels Pilgaard Grøndahl <[email protected]>
  • Loading branch information
NielsPilgaard and Niels Pilgaard Grøndahl authored Aug 24, 2023
1 parent 2c01fe9 commit b42155d
Show file tree
Hide file tree
Showing 25 changed files with 494 additions and 65 deletions.
22 changes: 22 additions & 0 deletions samples/BackgroundJobs.WorkerService/CronJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Cronos;
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class CronJob : ICronJob
{
private readonly ILogger<CronJob> _logger;
public CronJob(ILogger<CronJob> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(CronJob), DateTime.Now);

return Task.CompletedTask;
}

public CronExpression CronExpression => CronExpression.Parse("* * * * *");
}
22 changes: 22 additions & 0 deletions samples/BackgroundJobs.WorkerService/OneTimeJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class OneTimeJob : IOneTimeJob
{
private readonly ILogger<OneTimeJob> _logger;
private static readonly DateTime _utcNowAtStartup = DateTime.UtcNow;
public OneTimeJob(ILogger<OneTimeJob> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(OneTimeJob), DateTime.Now);

return Task.CompletedTask;
}

public DateTime ScheduledTimeUtc => _utcNowAtStartup.AddMinutes(1);
}
13 changes: 8 additions & 5 deletions samples/BackgroundJobs.WorkerService/Program.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
using BackgroundJobs.WorkerService;
using Pilgaard.BackgroundJobs;

IHost host = Host.CreateDefaultBuilder(args)
Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddBackgroundJobs()
.AddJob<RecurringJob>(nameof(RecurringJob));
.AddJob<RecurringJobEvery1Minute>()
.AddJob<RecurringJobEvery5Minutes>()
.AddJob<RecurringJobEvery10Minutes>()
.AddJob<RecurringJobEvery30Minutes>()
.AddJob<CronJob>()
.AddJob<OneTimeJob>();
})
.Build();

host.Run();
.Build().Run();
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class RecurringJob : IRecurringJob
{
private readonly ILogger<RecurringJob> _logger;
public RecurringJob(ILogger<RecurringJob> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(RecurringJob), DateTime.Now);

return Task.CompletedTask;
}

public TimeSpan Interval => TimeSpan.FromMinutes(10);
}
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class RecurringJobEvery10Minutes : IRecurringJob
{
private readonly ILogger<RecurringJobEvery10Minutes> _logger;
public RecurringJobEvery10Minutes(ILogger<RecurringJobEvery10Minutes> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(RecurringJobEvery10Minutes), DateTime.Now);

return Task.CompletedTask;
}

public TimeSpan Interval => TimeSpan.FromMinutes(10);
}
21 changes: 21 additions & 0 deletions samples/BackgroundJobs.WorkerService/RecurringJobEvery1Minute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class RecurringJobEvery1Minute : IRecurringJob
{
private readonly ILogger<RecurringJobEvery1Minute> _logger;
public RecurringJobEvery1Minute(ILogger<RecurringJobEvery1Minute> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(RecurringJobEvery1Minute), DateTime.Now);

return Task.CompletedTask;
}

public TimeSpan Interval => TimeSpan.FromMinutes(1);
}
21 changes: 21 additions & 0 deletions samples/BackgroundJobs.WorkerService/RecurringJobEvery30Minutes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class RecurringJobEvery30Minutes : IRecurringJob
{
private readonly ILogger<RecurringJobEvery30Minutes> _logger;
public RecurringJobEvery30Minutes(ILogger<RecurringJobEvery30Minutes> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(RecurringJobEvery30Minutes), DateTime.Now);

return Task.CompletedTask;
}

public TimeSpan Interval => TimeSpan.FromMinutes(30);
}
21 changes: 21 additions & 0 deletions samples/BackgroundJobs.WorkerService/RecurringJobEvery5Minutes.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Pilgaard.BackgroundJobs;

namespace BackgroundJobs.WorkerService;

public class RecurringJobEvery5Minutes : IRecurringJob
{
private readonly ILogger<RecurringJobEvery5Minutes> _logger;
public RecurringJobEvery5Minutes(ILogger<RecurringJobEvery5Minutes> logger)
{
_logger = logger;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("{jobName} executed at {now:G}", nameof(RecurringJobEvery5Minutes), DateTime.Now);

return Task.CompletedTask;
}

public TimeSpan Interval => TimeSpan.FromMinutes(5);
}
21 changes: 11 additions & 10 deletions src/Pilgaard.BackgroundJobs/BackgroundJobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ public BackgroundJobScheduler(IServiceScopeFactory scopeFactory,
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

if (registrationsValidator is null)
{
throw new ArgumentNullException(nameof(registrationsValidator));
}

registrationsValidator.Validate(_options.Value.Registrations);
}

/// <summary>
/// Asynchronously retrieves an ordered enumerable of background job registrations.
/// <para>
/// Each background job registration is returned when it should be run.
/// </para>
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used for cancelling the enumeration.</param>
/// <returns>An asynchronous enumerable of background job registrations.</returns>
public async IAsyncEnumerable<BackgroundJobRegistration> GetBackgroundJobsAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var interval = TimeSpan.FromSeconds(30);
Expand All @@ -53,7 +50,7 @@ public async IAsyncEnumerable<BackgroundJobRegistration> GetBackgroundJobsAsync(
{
var intervalMinus5Seconds = interval.Subtract(TimeSpan.FromSeconds(5));

_logger.LogDebug("No background job occurrences found in the TimeSpan {interval}, " +
_logger.LogDebug("No CronJob or OneTimeJob occurrences found in the TimeSpan {interval}, " +
"waiting for TimeSpan {interval} until checking again.", interval, intervalMinus5Seconds);

await Task.Delay(intervalMinus5Seconds, cancellationToken);
Expand All @@ -78,6 +75,9 @@ public async IAsyncEnumerable<BackgroundJobRegistration> GetBackgroundJobsAsync(
}
}


public IEnumerable<BackgroundJobRegistration> GetRecurringJobs() => _options.Value.Registrations.Where(registration => registration.IsRecurringJob);

/// <summary>
/// Gets an ordered enumerable of background job occurrences within the specified <paramref name="fetchInterval"/>.
/// </summary>
Expand All @@ -90,7 +90,8 @@ internal IEnumerable<BackgroundJobOccurrence> GetOrderedBackgroundJobOccurrences
using var scope = _scopeFactory.CreateScope();

var backgroundJobOccurrences = new List<BackgroundJobOccurrence>();
foreach (var registration in _options.Value.Registrations)

foreach (var registration in _options.Value.Registrations.Where(registration => !registration.IsRecurringJob))
{
var backgroundJob = registration.Factory(scope.ServiceProvider);

Expand Down
63 changes: 63 additions & 0 deletions src/Pilgaard.BackgroundJobs/BackgroundJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ internal sealed class BackgroundJobService : IBackgroundJobService
private readonly ILogger<BackgroundJobService> _logger;
private readonly IBackgroundJobScheduler _backgroundJobScheduler;

private event Func<object, EventArgs, BackgroundJobRegistration, CancellationToken, Task>? RecurringJobTimerTriggered;
private static readonly List<IDisposable> _recurringJobTimers = new();

private static readonly Meter _meter = new(
name: typeof(BackgroundJobService).Assembly.GetName().Name!,
version: typeof(BackgroundJobService).Assembly.GetName().Version?.ToString());
Expand Down Expand Up @@ -66,6 +69,8 @@ public BackgroundJobService(
/// </returns>
public async Task RunJobsAsync(CancellationToken cancellationToken = default)
{
ScheduleRecurringJobs(cancellationToken);

while (!cancellationToken.IsCancellationRequested)
{
_logger.LogDebug("Scheduling background jobs.");
Expand All @@ -83,6 +88,56 @@ public async Task RunJobsAsync(CancellationToken cancellationToken = default)
}
}

internal void ScheduleRecurringJobs(CancellationToken cancellationToken)
{
var recurringJobRegistrations = _backgroundJobScheduler.GetRecurringJobs();
if (recurringJobRegistrations.Any())
{
RecurringJobTimerTriggered += RunRecurringJobAsync;
}

using var scope = _scopeFactory.CreateScope();
foreach (var jobRegistration in recurringJobRegistrations)
{
if (jobRegistration.Factory(scope.ServiceProvider) is not IRecurringJob recurringJob)
{
_logger.LogError("Failed to schedule recurring job {@jobRegistration}. " +
"It does not implement {recurringJobInterface}",
jobRegistration, typeof(IRecurringJob));
continue;
}

var dueTime = recurringJob switch
{
IRecurringJobWithInitialDelay recurringJobWithInitialDelay => recurringJobWithInitialDelay.InitialDelay,
_ => recurringJob.Interval
};

var recurringJobTimer = new System.Threading.Timer(_ => RecurringJobTimerTriggered?.Invoke(this, EventArgs.Empty, jobRegistration, cancellationToken),
state: null,
dueTime: dueTime,
period: recurringJob.Interval);

_recurringJobTimers.Add(recurringJobTimer);

_logger.LogInformation("RecurringJob {jobName} has been scheduled to run every {interval}. " +
"The first run will be in {dueTime}",
jobRegistration.Name, recurringJob.Interval, dueTime);
}
}

#pragma warning disable IDE0060
/// <summary>
/// Runs the recurring job.
/// </summary>
/// <param name="sender">The sender. This is not used.</param>
/// <param name="eventArgs">The <see cref="EventArgs"/> instance containing the event data. This is not used.</param>
/// <param name="registration">The background job registration.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> which can be used to cancel the background job.</param>
internal async Task RunRecurringJobAsync(object sender, EventArgs eventArgs, BackgroundJobRegistration registration, CancellationToken cancellationToken)
=> await RunJobAsync(registration, cancellationToken);
#pragma warning restore IDE0060

/// <summary>
/// Constructs the background job using <see cref="BackgroundJobRegistration.Factory"/> and runs it.
/// </summary>
Expand Down Expand Up @@ -137,4 +192,12 @@ internal async Task RunJobAsync(BackgroundJobRegistration registration, Cancella
timeoutCancellationTokenSource?.Dispose();
}
}

public void Dispose()
{
foreach (var disposable in _recurringJobTimers)
{
disposable.Dispose();
}
}
}
9 changes: 9 additions & 0 deletions src/Pilgaard.BackgroundJobs/IBackgroundJobScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@ internal interface IBackgroundJobScheduler
/// <summary>
/// Asynchronously retrieves an ordered enumerable of background job registrations.
/// <para>
/// Jobs that implement <see cref="IRecurringJob"/> are not retrieved, they are scheduled during startup.
/// </para>
/// <para>
/// Each background job registration is returned when it should be run.
/// </para>
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> used for cancelling the enumeration.</param>
/// <returns>An asynchronous enumerable of background job registrations.</returns>
IAsyncEnumerable<BackgroundJobRegistration> GetBackgroundJobsAsync(CancellationToken cancellationToken);

/// <summary>
/// Retrieves all <see cref="BackgroundJobRegistration"/>s where <see cref="BackgroundJobRegistration.IsRecurringJob"/> is <c>true</c>
/// </summary>
/// <returns>An enumerable of background job registrations where <see cref="BackgroundJobRegistration.IsRecurringJob"/> is <c>true</c></returns>
IEnumerable<BackgroundJobRegistration> GetRecurringJobs();
}
2 changes: 1 addition & 1 deletion src/Pilgaard.BackgroundJobs/IBackgroundJobService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Pilgaard.BackgroundJobs;
/// <see cref="IBackgroundJobsBuilder"/>.
/// </para>
/// </remarks>
public interface IBackgroundJobService
public interface IBackgroundJobService : IDisposable
{
/// <summary>
/// Runs all the background jobs in the application.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace Pilgaard.BackgroundJobs;

/// <summary>
/// A simple implementation of <see cref="IRecurringJobWithInitialDelay"/> which uses a provided delegate to
/// implement the job.
/// </summary>
internal sealed class DelegateRecurringJobWithInitialDelay : IRecurringJobWithInitialDelay
{
private readonly Func<CancellationToken, Task> _job;

public TimeSpan Interval { get; }

public TimeSpan InitialDelay { get; }

public DelegateRecurringJobWithInitialDelay(Func<CancellationToken, Task> job, TimeSpan interval, TimeSpan initialDelay)
{
_job = job ?? throw new ArgumentNullException(nameof(job));
Interval = interval;
InitialDelay = initialDelay;
}

public Task RunJobAsync(CancellationToken cancellationToken = default)
=> _job(cancellationToken);

}
15 changes: 15 additions & 0 deletions src/Pilgaard.BackgroundJobs/Jobs/IRecurringJobWithInitialDelay.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Pilgaard.BackgroundJobs;

/// <summary>
/// This interface represents a background job that runs at a specified interval, after an initial delay.
/// </summary>
public interface IRecurringJobWithInitialDelay : IRecurringJob
{
/// <summary>
/// The initial delay before triggering <see cref="IBackgroundJob.RunJobAsync"/> the first time.
/// <para>
/// Setting this to <see cref="TimeSpan.Zero"/> triggers it immediately on startup.
/// </para>
/// </summary>
TimeSpan InitialDelay { get; }
}
Loading

0 comments on commit b42155d

Please sign in to comment.