From 98eb6f4abee802081557ed7af81e20e5232ea1fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poul=20Kjeldager=20S=C3=B8rensen?= Date: Thu, 19 May 2022 03:46:10 +0200 Subject: [PATCH] feat: Added dependency injection extensions --- .../IActionImplementationExtenssions.cs | 5 ++ .../WorkflowEngine.Core.csproj | 20 ++++-- .../DependencyInjectionExtensions.cs | 37 ++++++++++ .../HangfireExtensions.cs | 26 +++++++ .../JobNaturalKeyAttribute.cs | 31 ++++++++ .../ScheduledWorkflowTrigger.cs | 57 +++++++++++++++ .../WorkflowEngine.Hangfire.csproj | 14 ++++ .../WorkflowStarterBackgroundJob.cs | 70 +++++++++++++++++++ 8 files changed, 254 insertions(+), 6 deletions(-) create mode 100644 src/WorkflowEngine.Hangfire/DependencyInjectionExtensions.cs create mode 100644 src/WorkflowEngine.Hangfire/HangfireExtensions.cs create mode 100644 src/WorkflowEngine.Hangfire/JobNaturalKeyAttribute.cs create mode 100644 src/WorkflowEngine.Hangfire/ScheduledWorkflowTrigger.cs create mode 100644 src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs diff --git a/src/WorkflowEngine.Core/IActionImplementationExtenssions.cs b/src/WorkflowEngine.Core/IActionImplementationExtenssions.cs index 243963a..1068fae 100644 --- a/src/WorkflowEngine.Core/IActionImplementationExtenssions.cs +++ b/src/WorkflowEngine.Core/IActionImplementationExtenssions.cs @@ -10,6 +10,11 @@ public static IServiceCollection AddAction(this IServiceCollection services, return services.AddTransient() .AddSingleton< IActionImplementationMetadata>(new ActionImplementationMetadata { Type = type ?? typeof(T).Name }); } + + public static IServiceCollection AddWorkflow(this IServiceCollection services) where T :class, IWorkflow + { + return services.AddTransient(); + } } diff --git a/src/WorkflowEngine.Core/WorkflowEngine.Core.csproj b/src/WorkflowEngine.Core/WorkflowEngine.Core.csproj index 3aaa8a4..5f50d29 100644 --- a/src/WorkflowEngine.Core/WorkflowEngine.Core.csproj +++ b/src/WorkflowEngine.Core/WorkflowEngine.Core.csproj @@ -11,12 +11,20 @@ - - - - - - + + + + + + + + + + + + + + diff --git a/src/WorkflowEngine.Hangfire/DependencyInjectionExtensions.cs b/src/WorkflowEngine.Hangfire/DependencyInjectionExtensions.cs new file mode 100644 index 0000000..51779b8 --- /dev/null +++ b/src/WorkflowEngine.Hangfire/DependencyInjectionExtensions.cs @@ -0,0 +1,37 @@ +using Microsoft.Extensions.DependencyInjection; +using WorkflowEngine; +using WorkflowEngine.Core; +using WorkflowEngine.Core.Actions; +using WorkflowEngine.Core.Expressions; + +namespace Microsoft.Extensions.DependencyInjection +{ + + public static class DependencyInjectionExtensions + { + public static IServiceCollection AddWorkflowEngine(this IServiceCollection services) + where TOutputsRepository: class, IOutputsRepository + { + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(typeof(ScheduledWorkflowTrigger<>)); + + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddAction("Foreach"); + + services.AddFunctions(); + services.AddScoped(); + services.AddSingleton(); + + services.AddHostedService(); + + return services; + } + + + } +} diff --git a/src/WorkflowEngine.Hangfire/HangfireExtensions.cs b/src/WorkflowEngine.Hangfire/HangfireExtensions.cs new file mode 100644 index 0000000..7f5c4a9 --- /dev/null +++ b/src/WorkflowEngine.Hangfire/HangfireExtensions.cs @@ -0,0 +1,26 @@ +using Hangfire.Storage; +using System.Collections.Generic; + +namespace WorkflowEngine +{ + public static class HangfireExtensions + { + + + public static void SetJobExternalKey(this IStorageConnection connection, string externalId, string jobId) + { + // This method can be implemented in 1.1.0 + connection.SetRangeInHash($"x-backgroundjob-keys:{externalId}", new[] { new KeyValuePair("JobId", jobId) }); + } + + public static string GetJobIdByKey(this IStorageConnection connection, string externalId) + { + // This method can be implemented in 1.1.0 + var entries = connection.GetAllEntriesFromHash($"x-backgroundjob-keys:{externalId}"); + if (entries == null || !entries.ContainsKey("JobId")) + return null; + + return entries["JobId"]; + } + } +} diff --git a/src/WorkflowEngine.Hangfire/JobNaturalKeyAttribute.cs b/src/WorkflowEngine.Hangfire/JobNaturalKeyAttribute.cs new file mode 100644 index 0000000..dde0f2e --- /dev/null +++ b/src/WorkflowEngine.Hangfire/JobNaturalKeyAttribute.cs @@ -0,0 +1,31 @@ +using Hangfire.Client; +using Hangfire.Common; +using System; +using System.Linq; + +namespace WorkflowEngine +{ + + public class JobNaturalKeyAttribute : JobFilterAttribute, IClientFilter + { + public JobNaturalKeyAttribute(string keyFormat) + { + KeyFormat = keyFormat; + } + + public string KeyFormat { get; private set; } + + public void OnCreated(CreatedContext filterContext) + { + var key = String.Format(KeyFormat, args: filterContext.Job.Args.ToArray()); + filterContext.Connection.SetJobExternalKey(key, filterContext.BackgroundJob.Id); + + } + + public void OnCreating(CreatingContext filterContext) + { + + + } + } +} diff --git a/src/WorkflowEngine.Hangfire/ScheduledWorkflowTrigger.cs b/src/WorkflowEngine.Hangfire/ScheduledWorkflowTrigger.cs new file mode 100644 index 0000000..84c4ccb --- /dev/null +++ b/src/WorkflowEngine.Hangfire/ScheduledWorkflowTrigger.cs @@ -0,0 +1,57 @@ +using Hangfire; +using Hangfire.Client; +using Hangfire.Common; +using Hangfire.Storage; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using WorkflowEngine.Core; + +namespace WorkflowEngine +{ + + public class ScheduledWorkflowTrigger where TWorkflow : IWorkflow, new() + { + private readonly ILogger _logger; + private readonly IBackgroundJobClient _backgroundJobClient; + private readonly IStorageConnection _storageConnection; + + public ScheduledWorkflowTrigger(ILogger> logger, IBackgroundJobClient backgroundJobClient, JobStorage storageConnection) + { + _logger = logger; + _backgroundJobClient = backgroundJobClient; + _storageConnection = storageConnection.GetConnection(); + } + + [JobNaturalKey("{0}")] + public Task Trigger(string externalid, bool create, DateTimeOffset time, Dictionary inputs) + { + var existingJob = _storageConnection.GetJobIdByKey(externalid); + if (!string.IsNullOrEmpty(existingJob)) + { + _logger.LogInformation("Cleaning up existing Hangfire Job {JobID}", existingJob); + _backgroundJobClient.Delete(existingJob); + } + + if (create) + { + var workflow = new TWorkflow(); + workflow.Manifest.Triggers.First().Value.Inputs = inputs; + + var job = _backgroundJobClient.Schedule((executor) => executor.TriggerAsync( + new TriggerContext { Workflow = workflow, }), time); + + _logger.LogInformation("Created scheduled workflow job {JobID}", job); + + return Task.FromResult(job); + } + + return Task.FromResult(null); + } + + } +} diff --git a/src/WorkflowEngine.Hangfire/WorkflowEngine.Hangfire.csproj b/src/WorkflowEngine.Hangfire/WorkflowEngine.Hangfire.csproj index 61345ca..5399345 100644 --- a/src/WorkflowEngine.Hangfire/WorkflowEngine.Hangfire.csproj +++ b/src/WorkflowEngine.Hangfire/WorkflowEngine.Hangfire.csproj @@ -7,12 +7,26 @@ Delegate A/S WorkflowEngine used to execute workflows based on Json description. https://github.com/delegateas/WorkflowEngine + WorkflowEngine + + + + + + + + + + + + + diff --git a/src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs b/src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs new file mode 100644 index 0000000..9226b07 --- /dev/null +++ b/src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs @@ -0,0 +1,70 @@ +using Hangfire; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using WorkflowEngine; +using WorkflowEngine.Core; + +namespace Microsoft.Extensions.DependencyInjection +{ + public class WorkflowStarterBackgroundJob : BackgroundService + { + private readonly IServiceScopeFactory _serviceScopeFactory; + + public WorkflowStarterBackgroundJob(IServiceScopeFactory serviceScopeFactory) + { + _serviceScopeFactory = serviceScopeFactory; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var first = true; + while (!stoppingToken.IsCancellationRequested) + { + using var scope = _serviceScopeFactory.CreateScope(); + var sp = scope.ServiceProvider; + + var workflows = sp.GetRequiredService(); + var jobs = sp.GetRequiredService(); + + foreach (var workflow in await workflows.GetAllWorkflows()) + { + var trigger = workflow.Manifest.Triggers.FirstOrDefault(t => t.Value.Type == "TimerTrigger"); + + if (!trigger.Equals(default(KeyValuePair))) + { + + jobs.AddOrUpdate(workflow.Id.ToString(), + (executor) => executor.TriggerAsync(new TriggerContext + { + Workflow = workflow, + Trigger = new Trigger + { + Inputs = trigger.Value.Inputs, + ScheduledTime = DateTimeOffset.UtcNow, + Type = workflow.Manifest.Triggers.FirstOrDefault().Value.Type, + Key = trigger.Key + }, + }), trigger.Value.Inputs["cronExpression"] as string); + + if (first && trigger.Value.Inputs.ContainsKey("runAtStartup") && (bool)trigger.Value.Inputs["runAtStartup"]) + jobs.Trigger(workflow.Id.ToString()); + } + } + + + + first = false; + await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken); + + + + } + + + } + } +}