Skip to content

Commit

Permalink
feat(sync) report resource errors in DB mode (#5785)
Browse files Browse the repository at this point in the history
* feat(sendconfig) parse resource errors in DB mode

Use the GDR result channel to log actions and build resource errors from
failed actions.

Add minimal envtest scaffolding to test DB mode error event generation.

* chore(deps) use GDR v1.10.0

* feat(manager) disable leadership for DB mode tests

Add an argument that forces leadership to one mode or the other. Disable
it for integration and env tests.

The leadership election mechanism can behave strangely in tests and
isn't relevant for single-instance tests.
  • Loading branch information
rainest authored Apr 18, 2024
1 parent 432814d commit 1c68106
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ Adding a new version? You'll need three changes:
[#5787](https://github.com/Kong/kubernetes-ingress-controller/pull/5787)
- Add support in `HTTPRoute`s for `URLRewrite`:
- `FullPathRewrite` [#5855](https://github.com/Kong/kubernetes-ingress-controller/pull/5855)
- DB mode now supports Event reporting for resources that failed to apply.
[#5785](https://github.com/Kong/kubernetes-ingress-controller/pull/5785)

### Fixed

Expand Down
136 changes: 118 additions & 18 deletions internal/dataplane/sendconfig/dbmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package sendconfig
import (
"context"
"fmt"
"reflect"
"sync"

"github.com/blang/semver/v4"
"github.com/go-logr/logr"
"github.com/kong/go-database-reconciler/pkg/diff"
"github.com/kong/go-database-reconciler/pkg/dump"
"github.com/kong/go-database-reconciler/pkg/file"
Expand All @@ -14,29 +17,37 @@ import (

"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane/deckerrors"
"github.com/kong/kubernetes-ingress-controller/v3/internal/metrics"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
)

// UpdateStrategyDBMode implements the UpdateStrategy interface. It updates Kong's data-plane
// configuration using decK's syncer.
type UpdateStrategyDBMode struct {
client *kong.Client
dumpConfig dump.Config
version semver.Version
concurrency int
isKonnect bool
client *kong.Client
dumpConfig dump.Config
version semver.Version
concurrency int
isKonnect bool
logger logr.Logger
resourceErrors []ResourceError
resourceErrorLock *sync.Mutex
}

func NewUpdateStrategyDBMode(
client *kong.Client,
dumpConfig dump.Config,
version semver.Version,
concurrency int,
logger logr.Logger,
) UpdateStrategyDBMode {
return UpdateStrategyDBMode{
client: client,
dumpConfig: dumpConfig,
version: version,
concurrency: concurrency,
client: client,
dumpConfig: dumpConfig,
version: version,
concurrency: concurrency,
logger: logger,
resourceErrors: []ResourceError{},
resourceErrorLock: &sync.Mutex{},
}
}

Expand All @@ -45,8 +56,9 @@ func NewUpdateStrategyDBModeKonnect(
dumpConfig dump.Config,
version semver.Version,
concurrency int,
logger logr.Logger,
) UpdateStrategyDBMode {
s := NewUpdateStrategyDBMode(client, dumpConfig, version, concurrency)
s := NewUpdateStrategyDBMode(client, dumpConfig, version, concurrency, logger)
s.isKonnect = true
return s
}
Expand All @@ -70,23 +82,111 @@ func (s UpdateStrategyDBMode) Update(ctx context.Context, targetContent ContentW
}

syncer, err := diff.NewSyncer(diff.SyncerOpts{
CurrentState: cs,
TargetState: ts,
KongClient: s.client,
SilenceWarnings: true,
IsKonnect: s.isKonnect,
IncludeLicenses: true,
CurrentState: cs,
TargetState: ts,
KongClient: s.client,
SilenceWarnings: true,
IsKonnect: s.isKonnect,
IncludeLicenses: true,
EnableEntityActions: true,
})
if err != nil {
return fmt.Errorf("creating a new syncer for %s: %w", s.client.BaseRootURL(), err), nil, nil, nil
}

ctx, cancel := context.WithCancel(ctx)
go s.HandleEvents(ctx, syncer.GetResultChan())

_, errs, _ := syncer.Solve(ctx, s.concurrency, false, false)
cancel()
s.resourceErrorLock.Lock()
defer s.resourceErrorLock.Unlock()
if errs != nil {
return deckutils.ErrArray{Errors: errs}, nil, nil, nil
return deckutils.ErrArray{Errors: errs}, s.resourceErrors, nil, nil
}

// as of GDR 1.8 we should always get a plain error set in addition to resourceErrors, so returning resourceErrors
// here should not be necessary. Return it anyway as a future-proof because why not.
return nil, s.resourceErrors, nil, nil
}

// HandleEvents handles logging and error reporting for individual entity change events generated during a sync by
// looping over an event channel. It terminates when its context dies.
func (s *UpdateStrategyDBMode) HandleEvents(ctx context.Context, events chan diff.EntityAction) {
s.resourceErrorLock.Lock()
for {
select {
case event := <-events:
if event.Error == nil {
s.logger.V(util.DebugLevel).Info("updated gateway entity", "action", event.Action, "kind", event.Entity.Kind, "name", event.Entity.Name)
} else {
s.logger.Error(event.Error, "failed updating gateway entity", "action", event.Action, "kind", event.Entity.Kind, "name", event.Entity.Name)
parsed, err := resourceErrorFromEntityAction(event)
if err != nil {
s.logger.Error(err, "could not parse entity update error")
} else {
s.resourceErrors = append(s.resourceErrors, parsed)
}
}
case <-ctx.Done():
s.resourceErrorLock.Unlock()
return
}
}
}

func resourceErrorFromEntityAction(event diff.EntityAction) (ResourceError, error) {
var subj any
// GDR may produce an old only (delete), new only (create), or both (update) in an event. tags should be identical
// but we arbitrarily pull from new.
if event.Entity.New != nil {
subj = event.Entity.New
} else {
subj = event.Entity.Old
}
// GDR makes frequent use of "any" for its various entity handlers. It does not use interfaces that would allow us
// to guarantee that a particular entity does indeed have tags or similar and retrieve them. We're unlikely to
// refactor this any time soon, so in absence of proper interface methods, we pray that the entity probably has tags,
// which is a reasonable assumption as anything KIC can manage does. The reflect-fu here is sinister and menacing,
// but should spit out tags unless something has gone wrong.
reflected := reflect.Indirect(reflect.ValueOf(subj))
if reflected.Kind() != reflect.Struct {
// We need to fail fast here because FieldByName() will panic on non-Struct Kinds.
return ResourceError{}, fmt.Errorf("entity %s/%s is %s, not Struct",
event.Entity.Kind, event.Entity.Name, reflected.Kind())
}
tagsValue := reflected.FieldByName("Tags")
if tagsValue.IsZero() {
return ResourceError{}, fmt.Errorf("entity %s/%s of type %s lacks 'Tags' field",
event.Entity.Kind, event.Entity.Name, reflect.TypeOf(subj))
}
tags, ok := tagsValue.Interface().([]*string)
if !ok {
return ResourceError{}, fmt.Errorf("entity %s/%s Tags field is not []*string",
event.Entity.Kind, event.Entity.Name)
}

actualTags := []string{}
for _, s := range tags {
actualTags = append(actualTags, *s)
}

// This omits ID, which should be available but requires similar reflect gymnastics as Tags, and probably isn't worth
// it.
raw := rawResourceError{
Name: event.Entity.Name,
Tags: actualTags,
// /config flattened errors have a structured set of field to error reasons, whereas GDR errors are just plain
// un-parsed admin API endpoint strings. These will often mention a field within the string, e.g.
// schema violation (methods: cannot set 'methods' when 'protocols' is 'grpc' or 'grpcs')
// has "methods", but we'd need to do string parsing to extract it, and we may not catch all possible error types.
// This lazier approach just dumps the full error string as a single problem, which is probably good enough.
Problems: map[string]string{
"": fmt.Sprintf("%s", event.Error),
},
}

return nil, nil, nil, nil
return parseRawResourceError(raw)
}

func (s UpdateStrategyDBMode) MetricsProtocol() metrics.Protocol {
Expand Down
2 changes: 2 additions & 0 deletions internal/dataplane/sendconfig/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy(client UpdateClient
},
r.config.Version,
r.config.Concurrency,
r.logger,
)
}

Expand All @@ -111,6 +112,7 @@ func (r DefaultUpdateStrategyResolver) resolveUpdateStrategy(client UpdateClient
},
r.config.Version,
r.config.Concurrency,
r.logger,
)
}

Expand Down
3 changes: 3 additions & 0 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Config struct {
IngressClassName string
LeaderElectionNamespace string
LeaderElectionID string
LeaderElectionForce string
Concurrency int
FilterTags []string
WatchNamespaces []string
Expand Down Expand Up @@ -216,6 +217,8 @@ func (c *Config) FlagSet() *pflag.FlagSet {
flagSet.StringVar(&c.IngressClassName, "ingress-class", annotations.DefaultIngressClass, `Name of the ingress class to route through this controller.`)
flagSet.StringVar(&c.LeaderElectionID, "election-id", "5b374a9e.konghq.com", `Election id to use for status update.`)
flagSet.StringVar(&c.LeaderElectionNamespace, "election-namespace", "", `Leader election namespace to use when running outside a cluster.`)
flagSet.StringVar(&c.LeaderElectionForce, "force-leader-election", "", `Set to "enabled" or "disabled" to force a leader election behavior. Behavior is normally determined automatically from other settings.`)
_ = flagSet.MarkHidden("force-leader-election")
flagSet.StringSliceVar(&c.FilterTags, "kong-admin-filter-tag", []string{"managed-by-ingress-controller"},
"Tag(s) in comma-separated format (or specify this flag multiple times). They are used to manage and filter entities in Kong. "+
"This setting will be silently ignored if the Kong instance has no tags support.")
Expand Down
13 changes: 13 additions & 0 deletions internal/manager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,20 @@ func setupManagerOptions(ctx context.Context, logger logr.Logger, c *Config, dbm
return managerOpts, nil
}

const (
LeaderElectionEnabled = "enabled"
LeaderElectionDisabled = "disabled"
)

func leaderElectionEnabled(logger logr.Logger, c *Config, dbmode dpconf.DBMode) bool {
if c.LeaderElectionForce == LeaderElectionEnabled {
logger.Info("leader election forcibly enabled")
return true
}
if c.LeaderElectionForce == LeaderElectionDisabled {
logger.Info("leader election forcibly disabled")
return false
}
if c.Konnect.ConfigSynchronizationEnabled {
logger.Info("Konnect config synchronisation enabled, enabling leader election")
return true
Expand Down
115 changes: 114 additions & 1 deletion test/envtest/configerrorevent_envtest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,32 @@ package envtest
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"testing"
"text/template"
"time"

"github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/generators"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kong/kubernetes-ingress-controller/v3/internal/annotations"
"github.com/kong/kubernetes-ingress-controller/v3/internal/dataplane"
kongv1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1"
"github.com/kong/kubernetes-ingress-controller/v3/test"
"github.com/kong/kubernetes-ingress-controller/v3/test/mocks"
)

func TestConfigErrorEventGeneration(t *testing.T) {
func TestConfigErrorEventGenerationInMemoryMode(t *testing.T) {
// Can't be run in parallel because we're using t.Setenv() below which doesn't allow it.

const (
Expand Down Expand Up @@ -242,3 +249,109 @@ func formatErrBody(t *testing.T, namespace string, ingress *netv1.Ingress, servi

return b.Bytes()
}

func TestConfigErrorEventGenerationDBMode(t *testing.T) {
// Can't be run in parallel because we're using t.Setenv() below which doesn't allow it.

const (
waitTime = time.Minute
tickTime = 100 * time.Millisecond
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

scheme := Scheme(t, WithKong)
restConfig := Setup(t, scheme)
ctrlClientGlobal := NewControllerClient(t, scheme, restConfig)
ns := CreateNamespace(ctx, t, ctrlClientGlobal)
ctrlClient := client.NewNamespacedClient(ctrlClientGlobal, ns.Name)

ingressClassName := "kongenvtest"
deployIngressClass(ctx, t, ingressClassName, ctrlClient)

const podName = "kong-ingress-controller-tyjh1"
t.Setenv("POD_NAMESPACE", ns.Name)
t.Setenv("POD_NAME", podName)

t.Logf("creating a static consumer in %s namespace which will be used to test global validation", ns.Name)
consumer := &kongv1.KongConsumer{
ObjectMeta: metav1.ObjectMeta{
Name: "donenbai",
Annotations: map[string]string{
annotations.IngressClassKey: ingressClassName,
},
},
Username: "donenbai",
}
require.NoError(t, ctrlClient.Create(ctx, consumer))
t.Cleanup(func() {
if err := ctrlClient.Delete(ctx, consumer); err != nil && !apierrors.IsNotFound(err) && !errors.Is(err, context.Canceled) {
assert.NoError(t, err)
}
})

RunManager(ctx, t, restConfig,
AdminAPIOptFns(
// TODO IDK where we're getting the version from normally but it shouldn't really matter for this.
mocks.WithRoot(formatDBRootResponse("999.999.999")),
),
WithPublishService(ns.Name),
WithIngressClass(ingressClassName),
WithProxySyncSeconds(0.1),
)

t.Log("checking kongconsumer event creation")
require.Eventually(t, func() bool {
var events corev1.EventList
if err := ctrlClient.List(ctx, &events, &client.ListOptions{Namespace: ns.Name}); err != nil {
t.Logf("error listing events: %v", err)
return false
}
t.Logf("got %d events", len(events.Items))

matches := make([]bool, 1)
matches[0] = lo.ContainsBy(events.Items, func(e corev1.Event) bool {
return e.Reason == dataplane.KongConfigurationApplyFailedEventReason &&
e.InvolvedObject.Kind == "KongConsumer" &&
e.InvolvedObject.Name == consumer.Name &&
e.Message == "invalid : HTTP status 400 (message: \"2 schema violations (at least one of these fields must be non-empty: 'custom_id', 'username'; fake: unknown field)\")"
})
if lo.Count(matches, true) != 1 {
t.Logf("not all events matched: %+v", matches)
return false
}
return true
}, waitTime, tickTime)

t.Log("push failure events recorded successfully")
}

func formatDBRootResponse(version string) []byte {
const defaultDBLessRootResponse = `{
"version": "%s",
"configuration": {
"database": "postgres",
"router_flavor": "traditional",
"role": "traditional",
"proxy_listeners": [
{
"ipv6only=on": false,
"ipv6only=off": false,
"ssl": false,
"so_keepalive=off": false,
"listener": "0.0.0.0:8000",
"bind": false,
"port": 8000,
"deferred": false,
"so_keepalive=on": false,
"http2": false,
"proxy_protocol": false,
"ip": "0.0.0.0",
"reuseport": false
}
]
}
}`
return []byte(fmt.Sprintf(defaultDBLessRootResponse, version))
}
Loading

0 comments on commit 1c68106

Please sign in to comment.