From 5b04cd85f6aadc423a902dbb8cbeeea7ada8d23d Mon Sep 17 00:00:00 2001 From: Joel Rebello Date: Tue, 2 Jul 2024 16:55:00 +0200 Subject: [PATCH] condition publish method accepts a rollupSubject bool parameter This parameter tells the stream to overwrite any previous message with the same subject. This is to be enabled for subjects which do not include a wildcard or glob, more specifically this parameter will be set to true when the subject is directed at a server UUID. --- internal/orchestrator/updates.go | 2 +- pkg/api/v1/events/handlers.go | 2 +- pkg/api/v1/routes/handlers.go | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/internal/orchestrator/updates.go b/internal/orchestrator/updates.go index 073091d8..867c6cdd 100644 --- a/internal/orchestrator/updates.go +++ b/internal/orchestrator/updates.go @@ -541,7 +541,7 @@ func (o *Orchestrator) queueFollowingCondition(ctx context.Context, evt *v1types if active != nil && active.State == rctypes.Pending { byt := active.MustBytes() subject := fmt.Sprintf("%s.servers.%s", o.facility, active.Kind) - err := o.streamBroker.Publish(ctx, subject, byt) + err := o.streamBroker.Publish(ctx, subject, byt, false) if err != nil { o.logger.WithError(err).WithFields(logrus.Fields{ "condition.id": active.ID, diff --git a/pkg/api/v1/events/handlers.go b/pkg/api/v1/events/handlers.go index 58b7eb9b..1b4bcc10 100644 --- a/pkg/api/v1/events/handlers.go +++ b/pkg/api/v1/events/handlers.go @@ -38,7 +38,7 @@ func NewHandler(repository store.Repository, stream events.Stream, logger *logru return &Handler{repository: repository, stream: stream, logger: logger} } -// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to serverservice +// UpdateCondition sanity checks the incoming condition update, merges it and applies the result to the condition record. func (h *Handler) UpdateCondition(ctx context.Context, updEvt *v1types.ConditionUpdateEvent) error { _, span := otel.Tracer(pkgName).Start(ctx, "events.UpdateCondition") defer span.End() diff --git a/pkg/api/v1/routes/handlers.go b/pkg/api/v1/routes/handlers.go index 2d8b2194..35f89d0d 100644 --- a/pkg/api/v1/routes/handlers.go +++ b/pkg/api/v1/routes/handlers.go @@ -388,7 +388,7 @@ func (r *Routes) conditionCreate(otelCtx context.Context, newCondition *rctypes. } // publish the condition and in case of publish failure - revert. - err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition) + err = r.publishCondition(otelCtx, serverID, facilityCode, newCondition, false) if err != nil { r.logger.WithError(err).Warn("condition create failed to publish") @@ -453,7 +453,7 @@ func RegisterSpanEvent(span trace.Span, serverID, conditionID, conditionKind, ev )) } -func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition) error { +func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facilityCode string, publishCondition *rctypes.Condition, rollupSubject bool) error { errPublish := errors.New("error publishing condition") otelCtx, span := otel.Tracer(pkgName).Start( @@ -485,6 +485,7 @@ func (r *Routes) publishCondition(ctx context.Context, serverID uuid.UUID, facil otelCtx, subjectSuffix, byt, + rollupSubject, ); err != nil { return errors.Wrap(errPublish, err.Error()) }