Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: propagate EventPolicy filter to underlying Channels EventPolicy #8163

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"

Expand Down Expand Up @@ -554,6 +556,9 @@ func TestReconcile(t *testing.T) {
NewEventPolicy(readyEventPolicyName, testNS,
WithReadyEventPolicyCondition,
WithEventPolicyToRef(channelV1GVK, channelName),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Expand Down Expand Up @@ -587,6 +592,9 @@ func TestReconcile(t *testing.T) {
"messaging.knative.dev/channel-kind": "InMemoryChannel",
"messaging.knative.dev/channel-name": channelName,
}),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
}}
Expand Down
3 changes: 2 additions & 1 deletion pkg/reconciler/channel/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func MakeEventPolicyForBackingChannel(backingChannel *eventingduckv1.Channelable
},
},
},
From: parentPolicy.Spec.From,
From: parentPolicy.Spec.From,
Filters: parentPolicy.Spec.Filters,
},
}
}
Expand Down
77 changes: 73 additions & 4 deletions pkg/reconciler/parallel/parallel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake"
"knative.dev/pkg/tracker"
Expand Down Expand Up @@ -1311,6 +1313,61 @@ func TestAllBranches(t *testing.T) {
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
}, {
Name: "Propagates Filter of Parallels EventPolicy to ingress channels EventPolicy",
Key: pKey,
Objects: []runtime.Object{
NewFlowsParallel(parallelName, testNS,
WithInitFlowsParallelConditions,
WithFlowsParallelChannelTemplateSpec(imc),
WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
})),
NewEventPolicy(readyEventPolicyName, testNS,
WithReadyEventPolicyCondition,
WithEventPolicyToRef(parallelGVK, parallelName),
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
}),
),
},
WantErr: false,
WantCreates: []runtime.Object{
createChannel(parallelName),
createBranchChannel(parallelName, 0),
resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
}))),
resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, WithFlowsParallelChannelTemplateSpec(imc), WithFlowsParallelBranches([]v1.ParallelBranch{
{Filter: createFilter(0), Subscriber: createSubscriber(0)},
}))),
makeEventPolicy(parallelName, resources.ParallelBranchChannelName(parallelName, 0), 0),
makeIngressChannelEventPolicy(parallelName, resources.ParallelChannelName(parallelName), readyEventPolicyName,
WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewFlowsParallel(parallelName, testNS,
WithInitFlowsParallelConditions,
WithFlowsParallelChannelTemplateSpec(imc),
WithFlowsParallelBranches([]v1.ParallelBranch{{Filter: createFilter(0), Subscriber: createSubscriber(0)}}),
WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"),
WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"),
WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"),
WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)),
WithFlowsParallelEventPoliciesReady(),
WithFlowsParallelEventPoliciesListed(readyEventPolicyName),
WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{
FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse),
FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse),
SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse),
}})),
}},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
},
}

Expand Down Expand Up @@ -1494,8 +1551,8 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi
}
}

func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS,
func makeEventPolicy(parallelName, channelName string, branch int, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, channelName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyFrom(subscriberGVK, resources.ParallelFilterSubscriptionName(parallelName, branch), testNS),
Expand All @@ -1508,10 +1565,16 @@ func makeEventPolicy(parallelName, channelName string, branch int) *eventingv1al
}...),
WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS,
func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.ParallelEventPolicyName(parallelName, parallelEventPolicyName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
Expand All @@ -1527,4 +1590,10 @@ func makeIngressChannelEventPolicy(parallelName, channelName, parallelEventPolic
}...),
WithEventPolicyLabels(resources.LabelsForParallelChannelsEventPolicy(parallelName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}
3 changes: 2 additions & 1 deletion pkg/reconciler/parallel/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func MakeEventPolicyForParallelIngressChannel(p *flowsv1.Parallel, ingressChanne
},
},
},
From: parallelPolicy.Spec.From,
From: parallelPolicy.Spec.From,
Filters: parallelPolicy.Spec.Filters,
},
}
}
3 changes: 2 additions & 1 deletion pkg/reconciler/sequence/resources/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func MakeEventPolicyForSequenceInputChannel(s *flowsv1.Sequence, inputChannel *e
},
},
},
From: sequencePolicy.Spec.From,
From: sequencePolicy.Spec.From,
Filters: sequencePolicy.Spec.Filters,
},
}
}
135 changes: 131 additions & 4 deletions pkg/reconciler/sequence/sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"knative.dev/eventing/pkg/apis/feature"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -3561,6 +3563,119 @@ func TestAllCases(t *testing.T) {
},
})),
}},
}, {
Name: "Propagates Filter of Sequence EventPolicy to ingress channels EventPolicy",
Key: pKey,
Objects: []runtime.Object{
NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
})),
createChannel(sequenceName, 0),
createChannel(sequenceName, 1),
resources.NewSubscription(0, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}))),
resources.NewSubscription(1, NewSequence(sequenceName, testNS,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}))),
makeSequenceEventPolicy(sequenceName, WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
makeEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 1), 1),
},
WantErr: false,
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
feature.AuthorizationDefaultMode: feature.AuthorizationAllowSameNamespace,
}),
WantCreates: []runtime.Object{
makeInputChannelEventPolicy(sequenceName, resources.SequenceChannelName(sequenceName, 0), resources.SequenceEventPolicyName(sequenceName, ""), WithEventPolicyFilter(eventingv1.SubscriptionsAPIFilter{
CESQL: "true",
})),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSequence(sequenceName, testNS,
WithInitSequenceConditions,
WithSequenceChannelTemplateSpec(imc),
WithSequenceSteps([]v1.SequenceStep{
{Destination: createDestination(0)},
{Destination: createDestination(1)},
}),
WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"),
WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"),
WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"),
WithSequenceEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", sequenceName)),
WithSequenceChannelStatuses([]v1.SequenceChannelStatus{
{
Channel: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "InMemoryChannel",
Name: resources.SequenceChannelName(sequenceName, 0),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Channel does not have Ready condition",
},
},
{
Channel: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "InMemoryChannel",
Name: resources.SequenceChannelName(sequenceName, 1),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Channel does not have Ready condition",
},
},
}),
WithSequenceSubscriptionStatuses([]v1.SequenceSubscriptionStatus{
{
Subscription: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "Subscription",
Name: resources.SequenceSubscriptionName(sequenceName, 0),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Subscription does not have Ready condition",
},
},
{
Subscription: corev1.ObjectReference{
APIVersion: "messaging.knative.dev/v1",
Kind: "Subscription",
Name: resources.SequenceSubscriptionName(sequenceName, 1),
Namespace: testNS,
},
ReadyCondition: apis.Condition{
Type: apis.ConditionReady,
Status: corev1.ConditionUnknown,
Reason: "NoReady",
Message: "Subscription does not have Ready condition",
},
},
})),
}},
},
}

Expand Down Expand Up @@ -3598,8 +3713,8 @@ func makeEventPolicy(sequenceName, channelName string, step int) *eventingv1alph
}

// Write a function to make the event policy for the sequence
func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS,
func makeSequenceEventPolicy(sequenceName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, ""), testNS,
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
{
Expand All @@ -3609,10 +3724,16 @@ func makeSequenceEventPolicy(sequenceName string) *eventingv1alpha1.EventPolicy
},
}...),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string) *eventingv1alpha1.EventPolicy {
return NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS,
func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEventPolicyName string, opts ...EventPolicyOption) *eventingv1alpha1.EventPolicy {
ep := NewEventPolicy(resources.SequenceEventPolicyName(sequenceName, sequenceEventPolicyName), testNS,
WithEventPolicyToRef(channelV1GVK, channelName),
// from a subscription
WithEventPolicyOwnerReferences([]metav1.OwnerReference{
Expand All @@ -3624,6 +3745,12 @@ func makeInputChannelEventPolicy(sequenceName, channelName string, sequenceEvent
}...),
WithEventPolicyLabels(resources.LabelsForSequenceChannelsEventPolicy(sequenceName)),
)

for _, opt := range opts {
opt(ep)
}

return ep
}

func makeInputChannelEventPolicyWithWrongSpec(sequenceName, channelName, policyName string) *eventingv1alpha1.EventPolicy {
Expand Down
8 changes: 8 additions & 0 deletions pkg/reconciler/testing/v1/eventpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package testing
import (
"context"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/eventing/v1alpha1"
Expand Down Expand Up @@ -146,6 +148,12 @@ func WithEventPolicyFromSub(sub string) EventPolicyOption {
}
}

func WithEventPolicyFilter(filter eventingv1.SubscriptionsAPIFilter) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.Spec.Filters = append(ep.Spec.Filters, filter)
}
}

func WithEventPolicyLabels(labels map[string]string) EventPolicyOption {
return func(ep *v1alpha1.EventPolicy) {
ep.ObjectMeta.Labels = labels
Expand Down
Loading