Skip to content

Commit

Permalink
condition publish method accepts a rollupSubject bool parameter
Browse files Browse the repository at this point in the history
This parameter tells the stream to overwrite any previous message with
the same subject. This is to be enabled for subjects which do not
include a wildcard or glob, more specifically this parameter will be set
to true when the subject is directed at a server UUID.
  • Loading branch information
joelrebel committed Jul 3, 2024
1 parent 5d24e50 commit 5b04cd8
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
2 changes: 1 addition & 1 deletion internal/orchestrator/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (o *Orchestrator) queueFollowingCondition(ctx context.Context, evt *v1types
if active != nil && active.State == rctypes.Pending {
byt := active.MustBytes()
subject := fmt.Sprintf("%s.servers.%s", o.facility, active.Kind)
err := o.streamBroker.Publish(ctx, subject, byt)
err := o.streamBroker.Publish(ctx, subject, byt, false)
if err != nil {
o.logger.WithError(err).WithFields(logrus.Fields{
"condition.id": active.ID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/v1/events/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewHandler(repository store.Repository, stream events.Stream, logger *logru
return &Handler{repository: repository, stream: stream, logger: logger}
}

// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to serverservice
// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to the condition record.
func (h *Handler) UpdateCondition(ctx context.Context, updEvt *v1types.ConditionUpdateEvent) error {
_, span := otel.Tracer(pkgName).Start(ctx, "events.UpdateCondition")
defer span.End()
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/v1/routes/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (r *Routes) conditionCreate(otelCtx context.Context, newCondition *rctypes.
}

// publish the condition and in case of publish failure - revert.
err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition)
err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition, false)
if err != nil {
r.logger.WithError(err).Warn("condition create failed to publish")

Expand Down Expand Up @@ -453,7 +453,7 @@ func RegisterSpanEvent(span trace.Span, serverID, conditionID, conditionKind, ev
))
}

func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition) error {
func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition, rollupSubject bool) error {
errPublish := errors.New("error publishing condition")

otelCtx, span := otel.Tracer(pkgName).Start(
Expand Down Expand Up @@ -485,6 +485,7 @@ func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facil
otelCtx,
subjectSuffix,
byt,
rollupSubject,
); err != nil {
return errors.Wrap(errPublish, err.Error())
}
Expand Down

0 comments on commit 5b04cd8

Please sign in to comment.