Skip to content

Commit

Permalink
fix: added default queues for triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
pksorensen committed Aug 21, 2024
1 parent 3d85d29 commit 12c98e9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
1 change: 1 addition & 0 deletions src/WorkflowEngine.Core/TriggerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public static TriggerContext CreateTrigger(this IWorkflow workflow, Dictionary<s
}
public class TriggerContext : ITriggerContext, IFormattable
{
public string Queue { get; set; }
public IWorkflow Workflow { get; set; }
public ITrigger Trigger { get; set; }
public string PrincipalId { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static class BackgroundClientExtensions
public static string TriggerAsync<TTriggerContext>(this IBackgroundJobClient backgroundJobClient, TTriggerContext trigger)
where TTriggerContext : TriggerContext
{
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(
var job = backgroundJobClient.Enqueue<IHangfireWorkflowExecutor>(trigger.Queue ?? "default",
(executor) => executor.TriggerAsync(trigger, null));

return job;
Expand Down
13 changes: 11 additions & 2 deletions src/WorkflowEngine.Hangfire/WorkflowStarterBackgroundJob.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Hangfire;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -11,6 +12,10 @@

namespace Microsoft.Extensions.DependencyInjection
{
public class WorkflowStarterBackgroundJobOptions
{
public string QueueName { get; set; } = "default";
}
public class WorkflowStarterBackgroundJob : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
Expand All @@ -31,6 +36,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var workflows = sp.GetRequiredService<IWorkflowRepository>();
var jobs = sp.GetRequiredService<IRecurringJobManager>();
var configuration = sp.GetRequiredService<IConfiguration>();
var options = sp.GetRequiredService<IOptions<WorkflowStarterBackgroundJobOptions>>();


foreach (var workflow in await workflows.GetAllWorkflows())
{
Expand All @@ -52,21 +59,23 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

workflow.Manifest = null;

jobs.AddOrUpdate(workflow.Id.ToString() + trigger.Key,
jobs.AddOrUpdate(workflow.Id.ToString() + trigger.Key, options.Value.QueueName,
(System.Linq.Expressions.Expression<System.Action<IHangfireWorkflowExecutor>>) ((executor) => executor.TriggerAsync(new TriggerContext
{
Queue = options.Value.QueueName,
PrincipalId = "1b714972-8d0a-4feb-b166-08d93c6ae328",
Workflow = workflow,
Trigger = new Trigger
{

Inputs = trigger.Value.Inputs,
ScheduledTime = DateTimeOffset.UtcNow,
Type = trigger.Value.Type,
Key = trigger.Key
},
}, null)), trigger.Value.Inputs["cronExpression"] as string,new RecurringJobOptions
{
TimeZone = GetTimeZone(trigger)
TimeZone = GetTimeZone(trigger),
});


Expand Down

0 comments on commit 12c98e9

Please sign in to comment.