Skip to content

Commit

Permalink
refactor: merge sync and async api
Browse files Browse the repository at this point in the history
the sync function was only called by tests

this also fixes a data race w.r.t logging

Signed-off-by: Michael Adler <[email protected]>
  • Loading branch information
michaeladler committed Oct 19, 2023
1 parent ef5008d commit e59e165
Show file tree
Hide file tree
Showing 8 changed files with 12 additions and 18 deletions.
2 changes: 1 addition & 1 deletion internal/handler/job/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func CreateJob(ctx context.Context, storage persistence.Storage, request *model.
return nil, fault.Wrap(err, ftag.With(ftag.Internal))
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionCreate,
Job: createdJob,
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/definition/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Update(ctx context.Context, storage persistence.Storage, jobID string, defi
return nil, fault.Wrap(err)
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionUpdateDefinition,
Job: &model.Job{
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func DeleteJob(ctx context.Context, storage persistence.Storage, jobID string) e
return fault.Wrap(err)
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionDelete,
Job: &model.Job{
Expand Down
14 changes: 4 additions & 10 deletions internal/handler/job/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,14 @@ func SubscriberCount() int {
return count
}

// PublishEvent publishes a new event in a *synchronous* manner.
func PublishEventSync(ctx context.Context, event *JobEvent) {
// PublishEvent publishes a new event. It returns a channel which can be used
// to wait for the delivery of the event to all listeners.
func PublishEvent(ctx context.Context, event *JobEvent) chan struct{} {
log := logging.LoggerFromCtx(ctx)
log.Debug().
Str("jobID", event.Job.ID).
Str("action", string(event.Action)).Msg("Publishing event")
<-e.Emit(event.Job.ID, event)
}

// PublishEvent publishes a new event in an *asynchronous* manner.
func PublishEvent(ctx context.Context, event *JobEvent) {
go func() {
PublishEventSync(ctx, event)
}()
return e.Emit(event.Job.ID, event)
}

func (filter FilterParams) createEventFilter(subscriberID string, tags []string) func(*emitter.Event) {
Expand Down
4 changes: 2 additions & 2 deletions internal/handler/job/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func TestFiltering(t *testing.T) {
chAll, _ := AddSubscriber(ctx, FilterParams{}, nil) // no filter should receive all events

job1.Status.State = "FOO"
PublishEventSync(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job1})
<-PublishEvent(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job1})

job2.Status.State = "BAR"
PublishEventSync(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job2})
<-PublishEvent(context.Background(), &JobEvent{Action: ActionUpdateStatus, Job: &job2})

{
ev := <-ch1
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/status/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Update(ctx context.Context, storage persistence.Storage, jobID string, newS
return nil, fault.Wrap(err)
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionUpdateStatus,
Job: &model.Job{
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/tags/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Add(ctx context.Context, storage persistence.Storage, jobID string, tags []
return nil, fault.Wrap(err)
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionAddTags,
Job: &model.Job{
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/job/tags/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Delete(ctx context.Context, storage persistence.Storage, jobID string, tags
return nil, fault.Wrap(err)
}

events.PublishEvent(ctx, &events.JobEvent{
_ = events.PublishEvent(ctx, &events.JobEvent{
Ctime: strfmt.DateTime(time.Now()),
Action: events.ActionDeleteTags,
Job: &model.Job{
Expand Down

0 comments on commit e59e165

Please sign in to comment.