Skip to content

Commit

Permalink
feat: support delivery format in filter handler
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 committed Aug 12, 2024
1 parent 98f6939 commit c604bad
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,15 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger
Audience: trigger.Status.SubscriberAudience,
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl)
sendOptions := []kncloudevents.SendOption{}
if trigger.Spec.Delivery != nil && trigger.Spec.Delivery.Format != nil {
sendOptions = append(sendOptions, kncloudevents.WithEventFormat(trigger.Spec.Delivery.Format))
}

h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl, sendOptions...)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32, sendOpts ...kncloudevents.SendOption) {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down Expand Up @@ -433,6 +438,10 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
}))
}

if len(sendOpts) > 0 {
opts = append(opts, sendOpts...)
}

dispatchInfo, err := h.eventDispatcher.SendEvent(ctx, *event, target, opts...)
if err != nil {
h.logger.Error("failed to send event", zap.Error(err))
Expand Down

0 comments on commit c604bad

Please sign in to comment.