Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: updating pubsub system #3646

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/drivers/k8scel"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/externaldata"
"github.com/open-policy-agent/gatekeeper/v3/pkg/metrics"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/operations"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness/pruner"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
Expand Down Expand Up @@ -435,7 +435,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.

mutationSystem := mutation.NewSystem(mutationOpts)
expansionSystem := expansion.NewSystem(mutationSystem)
pubsubSystem := pubsub.NewSystem()
exportSystem := export.NewSystem()

c := mgr.GetCache()
dc, ok := c.(watch.RemovableCache)
Expand Down Expand Up @@ -508,7 +508,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.
MutationSystem: mutationSystem,
ExpansionSystem: expansionSystem,
ProviderCache: providerCache,
PubsubSystem: pubsubSystem,
ExportSystem: exportSystem,
}

if err := controller.AddToManager(mgr, &opts); err != nil {
Expand Down Expand Up @@ -538,7 +538,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, tracker *readiness.
ProcessExcluder: processExcluder,
CacheLister: auditCache,
ExpansionSystem: expansionSystem,
PubSubSystem: pubsubSystem,
ExportSystem: exportSystem,
}
if err := audit.AddToManager(mgr, &auditDeps); err != nil {
setupLog.Error(err, "unable to register audit with the manager")
Expand Down
4 changes: 2 additions & 2 deletions pkg/audit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

Expand All @@ -25,7 +25,7 @@ type Dependencies struct {
ProcessExcluder *process.Excluder
CacheLister *CacheLister
ExpansionSystem *expansion.System
PubSubSystem *pubsub.System
ExportSystem *export.System
}

// AddToManager adds audit manager to the Manager.
Expand Down
18 changes: 9 additions & 9 deletions pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
constraintclient "github.com/open-policy-agent/frameworks/constraint/pkg/client"
"github.com/open-policy-agent/frameworks/constraint/pkg/client/reviews"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
pubsubController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
exportController "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
mutationtypes "github.com/open-policy-agent/gatekeeper/v3/pkg/mutation/types"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/target"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -91,7 +91,7 @@ type Manager struct {
auditCache *CacheLister

expansionSystem *expansion.System
pubsubSystem *pubsub.System
exportSystem *export.System
}

// StatusViolation represents each violation under status.
Expand All @@ -107,7 +107,7 @@ type StatusViolation struct {
}

// ConstraintMsg represents publish message for each constraint.
type PubsubMsg struct {
type ExportMsg struct {
ID string `json:"id,omitempty"`
Details interface{} `json:"details,omitempty"`
EventType string `json:"eventType,omitempty"`
Expand Down Expand Up @@ -269,7 +269,7 @@ func New(mgr manager.Manager, deps *Dependencies) (*Manager, error) {
gkNamespace: util.GetNamespace(),
auditCache: deps.CacheLister,
expansionSystem: deps.ExpansionSystem,
pubsubSystem: deps.PubSubSystem,
exportSystem: deps.ExportSystem,
}
return am, nil
}
Expand Down Expand Up @@ -902,10 +902,10 @@ func (am *Manager) addAuditResponsesToUpdateLists(
details := r.Metadata["details"]
labels := r.obj.GetLabels()
logViolation(am.log, constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels)
if *pubsubController.PubsubEnabled {
err := am.pubsubSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if *exportController.ExportEnabled {
err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "pubsub audit Publishing")
am.log.Error(err, "error exporting audit violation")
}
}
if *emitAuditEvents {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ func violationMsg(constraint *unstructured.Unstructured, enforcementAction util.
userConstraintAnnotations := constraint.GetAnnotations()
delete(userConstraintAnnotations, "kubectl.kubernetes.io/last-applied-configuration")

return PubsubMsg{
return ExportMsg{
Message: message,
Details: details,
ID: timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ limitations under the License.
package controller

import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/export"
)

func init() {
Injectors = append(Injectors, &pubsub.Adder{})
Injectors = append(Injectors, &export.Adder{})
}
12 changes: 6 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync"
"github.com/open-policy-agent/gatekeeper/v3/pkg/expansion"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/fakes"
"github.com/open-policy-agent/gatekeeper/v3/pkg/mutation"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/open-policy-agent/gatekeeper/v3/pkg/watch"
Expand All @@ -56,8 +56,8 @@ type GetPodInjector interface {
InjectGetPod(func(context.Context) (*corev1.Pod, error))
}

type PubsubInjector interface {
InjectPubsubSystem(pubsubSystem *pubsub.System)
type ExportInjector interface {
InjectExportSystem(exportSystem *export.System)
}

type DataClientInjector interface {
Expand Down Expand Up @@ -101,7 +101,7 @@ type Dependencies struct {
MutationSystem *mutation.System
ExpansionSystem *expansion.System
ProviderCache *externaldata.ProviderCache
PubsubSystem *pubsub.System
ExportSystem *export.System
SyncEventsCh chan event.GenericEvent
CacheMgr *cm.CacheManager
}
Expand Down Expand Up @@ -212,8 +212,8 @@ func AddToManager(m manager.Manager, deps *Dependencies) error {
if a2, ok := a.(GetPodInjector); ok {
a2.InjectGetPod(deps.GetPod)
}
if a2, ok := a.(PubsubInjector); ok {
a2.InjectPubsubSystem(deps.PubsubSystem)
if a2, ok := a.(ExportInjector); ok {
a2.InjectExportSystem(deps.ExportSystem)
}
if a2, ok := a.(CacheManagerInjector); ok {
// this is used by the config controller to sync
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package pubsub
package export

import (
"context"
"encoding/json"
"flag"
"fmt"

"github.com/open-policy-agent/gatekeeper/v3/pkg/export"
"github.com/open-policy-agent/gatekeeper/v3/pkg/logging"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
corev1 "k8s.io/api/core/v1"
Expand All @@ -25,36 +25,36 @@ import (
)

var (
PubsubEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
ExportEnabled = flag.Bool("enable-pub-sub", false, "(alpha) Enabled pubsub to publish messages")
log = logf.Log.WithName("controller").WithValues(logging.Process, "pubsub_controller")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to rename the flag to remove pub-sub word?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May as well while it's still alpha

Copy link
Member

@sozercan sozercan Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we provide a warning for the user if we change the name? we can remove it after a release?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? How would we provide warning for changing a name before release?

)

type Adder struct {
PubsubSystem *pubsub.System
ExportSystem *export.System
}

func (a *Adder) Add(mgr manager.Manager) error {
if !*PubsubEnabled {
if !*ExportEnabled {
return nil
}
log.Info("Warning: Alpha flag enable-pub-sub is set to true. This flag may change in the future.")
r := newReconciler(mgr, a.PubsubSystem)
r := newReconciler(mgr, a.ExportSystem)
return add(mgr, r)
}

func (a *Adder) InjectTracker(_ *readiness.Tracker) {}

func (a *Adder) InjectPubsubSystem(pubsubSystem *pubsub.System) {
a.PubsubSystem = pubsubSystem
func (a *Adder) InjectExportSystem(exportSystem *export.System) {
a.ExportSystem = exportSystem
}

type Reconciler struct {
client.Client
scheme *runtime.Scheme
system *pubsub.System
system *export.System
}

func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
func newReconciler(mgr manager.Manager, system *export.System) *Reconciler {
return &Reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
Expand All @@ -63,7 +63,7 @@ func newReconciler(mgr manager.Manager, system *pubsub.System) *Reconciler {
}

func add(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New("pubsub-config-controller", mgr, controller.Options{Reconciler: r})
c, err := controller.New("export-config-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}
Expand Down Expand Up @@ -111,22 +111,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
}

if len(cfg.Data) == 0 {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName))
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a "connection" necessarily?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"link" may be? I just couldn't come up with better alternative at the time.

if _, ok := cfg.Data["provider"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName))
if _, ok := cfg.Data["driver"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing driver field in configmap %s, unable to establish connection", request.NamespacedName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is changing from config map to CRD coming later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can raise a follow up PR, we have the design finalized as per last discussion.

}
var config interface{}
err = json.Unmarshal([]byte(cfg.Data["config"]), &config)
if err != nil {
return reconcile.Result{}, err
}

err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"])
err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["driver"])
if err != nil {
return reconcile.Result{}, err
}

log.Info("Connection upsert successful", "name", request.Name, "provider", cfg.Data["provider"])
log.Info("Connection upsert successful", "name", request.Name, "driver", cfg.Data["driver"])
return reconcile.Result{}, nil
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package pubsub
package export

import (
"context"
"flag"
"fmt"
"testing"

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/export/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/util"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -48,7 +48,7 @@ func TestReconcile(t *testing.T) {
},
},
wantErr: true,
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to configure respective pubsub", request.NamespacedName),
errorMsg: fmt.Sprintf("data missing in configmap %s, unable to establish connection", request.NamespacedName),
},
}
for _, tc := range testCases {
Expand Down
88 changes: 88 additions & 0 deletions pkg/export/dapr/dapr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package dapr

import (
"context"
"encoding/json"
"fmt"

daprClient "github.com/dapr/go-sdk/client"
)

type Connection struct {
// Name of the component object to use in Dapr
component string

client daprClient.Client
}

// Dapr represents driver to use Dapr.
type Dapr struct {
openConnections map[string]Connection
}

const (
Name = "dapr"
)

var Connections = &Dapr{
openConnections: make(map[string]Connection),
}

func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{}, topic string) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

conn, ok := r.openConnections[connectionName]
if !ok {
return fmt.Errorf("connection not found: %s for Dapr driver", connectionName)
}
err = conn.client.PublishEvent(context.Background(), conn.component, topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}

return nil
}

func (r *Dapr) CloseConnection(connectionName string) error {
delete(r.openConnections, connectionName)
return nil
}

func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
component, ok := cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}
conn := r.openConnections[connectionName]
conn.component = component
r.openConnections[connectionName] = conn
return nil
}

func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error {
var conn Connection
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
conn.component, ok = cfg["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
}

tmp, err := daprClient.NewClient()
if err != nil {
return err
}

conn.client = tmp
r.openConnections[connectionName] = conn
return nil
}
Loading
Loading