diff --git a/internal/handler/job/create.go b/internal/handler/job/create.go index a509cdac..eb0fc85e 100644 --- a/internal/handler/job/create.go +++ b/internal/handler/job/create.go @@ -68,7 +68,7 @@ func CreateJob(ctx context.Context, storage persistence.Storage, request *model. return nil, fault.Wrap(err, ftag.With(ftag.Internal)) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionCreate, Job: createdJob, diff --git a/internal/handler/job/definition/update.go b/internal/handler/job/definition/update.go index 84b96fe8..5533cb9c 100644 --- a/internal/handler/job/definition/update.go +++ b/internal/handler/job/definition/update.go @@ -43,7 +43,7 @@ func Update(ctx context.Context, storage persistence.Storage, jobID string, defi return nil, fault.Wrap(err) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionUpdateDefinition, Job: &model.Job{ diff --git a/internal/handler/job/delete.go b/internal/handler/job/delete.go index e408bcd8..5a098b80 100644 --- a/internal/handler/job/delete.go +++ b/internal/handler/job/delete.go @@ -34,7 +34,7 @@ func DeleteJob(ctx context.Context, storage persistence.Storage, jobID string) e return fault.Wrap(err) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionDelete, Job: &model.Job{ diff --git a/internal/handler/job/events/events.go b/internal/handler/job/events/events.go index e2e67b76..7c6134d3 100644 --- a/internal/handler/job/events/events.go +++ b/internal/handler/job/events/events.go @@ -87,20 +87,14 @@ func SubscriberCount() int { return count } -// PublishEvent publishes a new event in a *synchronous* manner. -func PublishEventSync(ctx context.Context, event *JobEvent) { +// PublishEvent publishes a new event. It returns a channel which can be used +// to wait for the delivery of the event to all listeners. +func PublishEvent(ctx context.Context, event *JobEvent) chan struct{} { log := logging.LoggerFromCtx(ctx) log.Debug(). Str("jobID", event.Job.ID). Str("action", string(event.Action)).Msg("Publishing event") - <-e.Emit(event.Job.ID, event) -} - -// PublishEvent publishes a new event in an *asynchronous* manner. -func PublishEvent(ctx context.Context, event *JobEvent) { - go func() { - PublishEventSync(ctx, event) - }() + return e.Emit(event.Job.ID, event) } func (filter FilterParams) createEventFilter(subscriberID string, tags []string) func(*emitter.Event) { diff --git a/internal/handler/job/events/events_test.go b/internal/handler/job/events/events_test.go index 9ce42b1c..a9b1e799 100644 --- a/internal/handler/job/events/events_test.go +++ b/internal/handler/job/events/events_test.go @@ -39,10 +39,10 @@ func TestFiltering(t *testing.T) { chAll, _ := AddSubscriber(ctx, FilterParams{}, nil) // no filter should receive all events job1.Status.State = "FOO" - PublishEventSync(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job1}) + <-PublishEvent(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job1}) job2.Status.State = "BAR" - PublishEventSync(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job2}) + <-PublishEvent(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job2}) { ev := <-ch1 diff --git a/internal/handler/job/status/update.go b/internal/handler/job/status/update.go index 73272249..a8ade049 100644 --- a/internal/handler/job/status/update.go +++ b/internal/handler/job/status/update.go @@ -88,7 +88,7 @@ func Update(ctx context.Context, storage persistence.Storage, jobID string, newS return nil, fault.Wrap(err) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionUpdateStatus, Job: &model.Job{ diff --git a/internal/handler/job/tags/add.go b/internal/handler/job/tags/add.go index d8cc5d21..57555343 100644 --- a/internal/handler/job/tags/add.go +++ b/internal/handler/job/tags/add.go @@ -35,7 +35,7 @@ func Add(ctx context.Context, storage persistence.Storage, jobID string, tags [] return nil, fault.Wrap(err) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionAddTags, Job: &model.Job{ diff --git a/internal/handler/job/tags/delete.go b/internal/handler/job/tags/delete.go index a431636c..e9815cfd 100644 --- a/internal/handler/job/tags/delete.go +++ b/internal/handler/job/tags/delete.go @@ -35,7 +35,7 @@ func Delete(ctx context.Context, storage persistence.Storage, jobID string, tags return nil, fault.Wrap(err) } - events.PublishEvent(ctx, &events.JobEvent{ + _ = events.PublishEvent(ctx, &events.JobEvent{ Ctime: strfmt.DateTime(time.Now()), Action: events.ActionDeleteTags, Job: &model.Job{