diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index 7df891b7e8f..52b687baec7 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -19,6 +19,8 @@ import ( "fmt" "testing" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" @@ -554,6 +556,9 @@ func TestReconcile(t *testing.T) { NewEventPolicy(readyEventPolicyName, testNS, WithReadyEventPolicyCondition, WithEventPolicyToRef(channelV1GVK, channelName), + WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + }), ), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ @@ -587,6 +592,9 @@ func TestReconcile(t *testing.T) { "messaging.knative.dev/channel-kind": "InMemoryChannel", "messaging.knative.dev/channel-name": channelName, }), + WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + }), ), }, }} diff --git a/pkg/reconciler/channel/resources/eventpolicy.go b/pkg/reconciler/channel/resources/eventpolicy.go index 23f08217e59..2f46304d4bd 100644 --- a/pkg/reconciler/channel/resources/eventpolicy.go +++ b/pkg/reconciler/channel/resources/eventpolicy.go @@ -61,7 +61,8 @@ func MakeEventPolicyForBackingChannel(backingChannel *eventingduckv1.Channelable }, }, }, - From: parentPolicy.Spec.From, + From: parentPolicy.Spec.From, + Filters: parentPolicy.Spec.Filters, }, } } diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index e467b7b6d7c..c43cd1de77b 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" "knative.dev/pkg/tracker" @@ -1311,6 +1313,61 @@ func TestAllBranches(t *testing.T) { feature.OIDCAuthentication: feature.Enabled, feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, }), + }, { + Name: "Propagates Filter of Parallels EventPolicy to ingress channels EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + })), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(parallelGVK, parallelName), + WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + }), + ), + }, + WantErr: false, + WantCreates: []runtime.Object{ + createChannel(parallelName), + createBranchChannel(parallelName, 0), + resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{ + {Filter: createFilter(0), Subscriber: createSubscriber(0)}, + }))), + makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0), + makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName, + WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + })), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewFlowsParallel(parallelName, testNS, + WithInitFlowsParallelConditions, + WithFlowsParallelChannelTemplateSpec(imc), + WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}), + WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), + WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), + WithFlowsParallelEventPoliciesReady(), + WithFlowsParallelEventPoliciesListed(readyEventPolicyName), + WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ + FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), + SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), + }})), + }}, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), }, } @@ -1494,8 +1551,8 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi } } -func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, +func makeEventPolicy(parallelName, channelName string, branch int, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy { + ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS, WithEventPolicyToRef(channelV1GVK, channelName), // from a subscription WithEventPolicyFrom(subscriberGVK, resources.ParallelFilterSubscriptionName(parallelName, branch), testNS), @@ -1508,10 +1565,16 @@ func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1al }...), WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), ) + + for _, opt := range opts { + opt(ep) + } + + return ep } -func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS, +func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy { + ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS, WithEventPolicyToRef(channelV1GVK, channelName), // from a subscription WithEventPolicyOwnerReferences([]metav1.OwnerReference{ @@ -1527,4 +1590,10 @@ func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolic }...), WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)), ) + + for _, opt := range opts { + opt(ep) + } + + return ep } diff --git a/pkg/reconciler/parallel/resources/eventpolicy.go b/pkg/reconciler/parallel/resources/eventpolicy.go index b2c1b0a71f6..e8c7063439f 100644 --- a/pkg/reconciler/parallel/resources/eventpolicy.go +++ b/pkg/reconciler/parallel/resources/eventpolicy.go @@ -112,7 +112,8 @@ func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChanne }, }, }, - From: parallelPolicy.Spec.From, + From: parallelPolicy.Spec.From, + Filters: parallelPolicy.Spec.Filters, }, } } diff --git a/pkg/reconciler/sequence/resources/eventpolicy.go b/pkg/reconciler/sequence/resources/eventpolicy.go index efe14d3d541..1c9de2fc6dd 100644 --- a/pkg/reconciler/sequence/resources/eventpolicy.go +++ b/pkg/reconciler/sequence/resources/eventpolicy.go @@ -112,7 +112,8 @@ func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *e }, }, }, - From: sequencePolicy.Spec.From, + From: sequencePolicy.Spec.From, + Filters: sequencePolicy.Spec.Filters, }, } } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 02d59cecc96..43b96e60ce8 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -21,6 +21,8 @@ import ( "fmt" "testing" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/feature" corev1 "k8s.io/api/core/v1" @@ -3561,6 +3563,119 @@ func TestAllCases(t *testing.T) { }, })), }}, + }, { + Name: "Propagates Filter of Sequence EventPolicy to ingress channels EventPolicy", + Key: pKey, + Objects: []runtime.Object{ + NewSequence(sequenceName, testNS, + WithInitSequenceConditions, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}, + })), + createChannel(sequenceName, 0), + createChannel(sequenceName, 1), + resources.NewSubscription(0, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}, + }))), + resources.NewSubscription(1, NewSequence(sequenceName, testNS, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}, + }))), + makeSequenceEventPolicy(sequenceName, WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + })), + makeEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 1), 1), + }, + WantErr: false, + Ctx: feature.ToContext(context.Background(), feature.Flags{ + feature.OIDCAuthentication: feature.Enabled, + feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace, + }), + WantCreates: []runtime.Object{ + makeInputChannelEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 0), resources.SequenceEventPolicyName(sequenceName, ""), WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{ + CESQL: "true", + })), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSequence(sequenceName, testNS, + WithInitSequenceConditions, + WithSequenceChannelTemplateSpec(imc), + WithSequenceSteps([]v1.SequenceStep{ + {Destination: createDestination(0)}, + {Destination: createDestination(1)}, + }), + WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"), + WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + WithSequenceEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", sequenceName)), + WithSequenceChannelStatuses([]v1.SequenceChannelStatus{ + { + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: resources.SequenceChannelName(sequenceName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionUnknown, + Reason: "NoReady", + Message: "Channel does not have Ready condition", + }, + }, + { + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "InMemoryChannel", + Name: resources.SequenceChannelName(sequenceName, 1), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionUnknown, + Reason: "NoReady", + Message: "Channel does not have Ready condition", + }, + }, + }), + WithSequenceSubscriptionStatuses([]v1.SequenceSubscriptionStatus{ + { + Subscription: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + Name: resources.SequenceSubscriptionName(sequenceName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionUnknown, + Reason: "NoReady", + Message: "Subscription does not have Ready condition", + }, + }, + { + Subscription: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + Name: resources.SequenceSubscriptionName(sequenceName, 1), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionUnknown, + Reason: "NoReady", + Message: "Subscription does not have Ready condition", + }, + }, + })), + }}, }, } @@ -3598,8 +3713,8 @@ func makeEventPolicy(sequenceName, channelName string, step int) *eventingv1alph } // Write a function to make the event policy for the sequence -func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS, +func makeSequenceEventPolicy(sequenceName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy { + ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS, // from a subscription WithEventPolicyOwnerReferences([]metav1.OwnerReference{ { @@ -3609,10 +3724,16 @@ func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy }, }...), ) + + for _, opt := range opts { + opt(ep) + } + + return ep } -func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string) *eventingv1alpha1.EventPolicy { - return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS, +func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy { + ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS, WithEventPolicyToRef(channelV1GVK, channelName), // from a subscription WithEventPolicyOwnerReferences([]metav1.OwnerReference{ @@ -3624,6 +3745,12 @@ func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEvent }...), WithEventPolicyLabels(resources.LabelsForSequenceChannelsEventPolicy(sequenceName)), ) + + for _, opt := range opts { + opt(ep) + } + + return ep } func makeInputChannelEventPolicyWithWrongSpec(sequenceName, channelName, policyName string) *eventingv1alpha1.EventPolicy { diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go index df0dedc9700..042c842ec9a 100644 --- a/pkg/reconciler/testing/v1/eventpolicy.go +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -19,6 +19,8 @@ package testing import ( "context" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/eventing/v1alpha1" @@ -146,6 +148,12 @@ func WithEventPolicyFromSub(sub string) EventPolicyOption { } } +func WithEventPolicyFilter(filter eventingv1.SubscriptionsAPIFilter) EventPolicyOption { + return func(ep *v1alpha1.EventPolicy) { + ep.Spec.Filters = append(ep.Spec.Filters, filter) + } +} + func WithEventPolicyLabels(labels map[string]string) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { ep.ObjectMeta.Labels = labels