Skip to content

Commit

Permalink
egressgw: use Resource[CiliumEndpoint]
Browse files Browse the repository at this point in the history
Replace the custom watcher with Resource[CiliumEndpoint]. This also
allows getting rid of the custom retry queue, since Resource will
by default retry an event if Done is called with an error.

Signed-off-by: Lorenz Bauer <[email protected]>
  • Loading branch information
lmb authored and joamaki committed Sep 25, 2023
1 parent 2ed4e8b commit b2cb840
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 151 deletions.
6 changes: 0 additions & 6 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,11 +631,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams

d.cgroupManager = manager.NewCgroupManager()

var egressGatewayWatcher watchers.EgressGatewayManager
if d.egressGatewayManager != nil {
egressGatewayWatcher = d.egressGatewayManager
}

d.k8sWatcher = watchers.NewK8sWatcher(
params.Clientset,
d.endpointManager,
Expand All @@ -646,7 +641,6 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
d.datapath,
d.redirectPolicyManager,
d.bgpSpeaker,
egressGatewayWatcher,
d.l7Proxy,
option.Config,
d.ipcache,
Expand Down
1 change: 1 addition & 0 deletions daemon/k8s/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
k8s.CiliumClusterwideNetworkPolicyResource,
k8s.CiliumCIDRGroupResource,
k8s.CiliumNodeResource,
k8s.CiliumSlimEndpointResource,
),
)
)
Expand Down
14 changes: 12 additions & 2 deletions pkg/egressgateway/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
slimv1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
k8sTypes "github.com/cilium/cilium/pkg/k8s/types"
"github.com/cilium/cilium/pkg/policy/api"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -48,8 +49,10 @@ func (fr fakeResource[T]) Observe(ctx context.Context, next func(event resource.
}

func (fr fakeResource[T]) Events(ctx context.Context, opts ...resource.EventsOpt) <-chan resource.Event[T] {
if opts != nil {
panic("opts not supported")
if len(opts) > 1 {
// Ideally we'd only ignore resource.WithRateLimit here, but that
// isn't possible.
panic("more than one option is not supported")
}
return fr
}
Expand Down Expand Up @@ -157,3 +160,10 @@ func newCEGP(params *policyParams) (*v2.CiliumEgressGatewayPolicy, *PolicyConfig

return cegp, policy
}

func addEndpoint(tb testing.TB, endpoints fakeResource[*k8sTypes.CiliumEndpoint], ep *k8sTypes.CiliumEndpoint) {
endpoints.process(tb, resource.Event[*k8sTypes.CiliumEndpoint]{
Kind: resource.Upsert,
Object: ep,
})
}
143 changes: 35 additions & 108 deletions pkg/egressgateway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/cilium/cilium/pkg/hive/cell"
"github.com/cilium/cilium/pkg/identity"
identityCache "github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/k8s"
cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
k8sTypes "github.com/cilium/cilium/pkg/k8s/types"
Expand Down Expand Up @@ -116,6 +115,9 @@ type Manager struct {
// nodesResource allows reading node CRD from k8s.
ciliumNodes resource.Resource[*cilium_api_v2.CiliumNode]

// endpoints allows reading endpoint CRD from k8s.
endpoints resource.Resource[*k8sTypes.CiliumEndpoint]

// policyConfigs stores policy configs indexed by policyID
policyConfigs map[policyID]*PolicyConfig

Expand All @@ -126,21 +128,6 @@ type Manager struct {
// epDataStore stores endpointId to endpoint metadata mapping
epDataStore map[endpointID]*endpointMetadata

// pendingEndpointEvents stores the k8s CiliumEndpoint add/update events
// which still need to be processed by the manager, either because we
// just received the event, or because the processing failed due to the
// manager being unable to resolve the endpoint identity to a set of
// labels
pendingEndpointEvents map[endpointID]*k8sTypes.CiliumEndpoint

// pendingEndpointEventsLock protects the access to the
// pendingEndpointEvents map
pendingEndpointEventsLock lock.RWMutex

// endpointEventsQueue is a workqueue of CiliumEndpoint IDs that need to
// be processed by the manager
endpointEventsQueue workqueue.RateLimitingInterface

// identityAllocator is used to fetch identity labels for endpoint updates
identityAllocator identityCache.IdentityAllocator

Expand Down Expand Up @@ -176,11 +163,11 @@ type Params struct {

Config Config
DaemonConfig *option.DaemonConfig
CacheStatus k8s.CacheStatus
IdentityAllocator identityCache.IdentityAllocator
PolicyMap egressmap.PolicyMap
Policies resource.Resource[*Policy]
Nodes resource.Resource[*cilium_api_v2.CiliumNode]
Endpoints resource.Resource[*k8sTypes.CiliumEndpoint]

Lifecycle hive.Lifecycle
}
Expand Down Expand Up @@ -238,26 +225,18 @@ func NewEgressGatewayManager(p Params) (out struct {
}

func newEgressGatewayManager(p Params) (*Manager, error) {
// here we try to mimic the same exponential backoff retry logic used by
// the identity allocator, where the minimum retry timeout is set to 20
// milliseconds and the max number of attempts is 16 (so 20ms * 2^16 ==
// ~20 minutes)
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond*20, time.Minute*20)
endpointEventRetryQueue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{})

manager := &Manager{
nodeDataStore: make(map[string]nodeTypes.Node),
policyConfigs: make(map[policyID]*PolicyConfig),
policyConfigsBySourceIP: make(map[string][]*PolicyConfig),
epDataStore: make(map[endpointID]*endpointMetadata),
pendingEndpointEvents: make(map[endpointID]*k8sTypes.CiliumEndpoint),
endpointEventsQueue: endpointEventRetryQueue,
identityAllocator: p.IdentityAllocator,
installRoutes: p.Config.InstallEgressGatewayRoutes,
reconciliationTriggerInterval: p.Config.EgressGatewayReconciliationTriggerInterval,
policyMap: p.PolicyMap,
policies: p.Policies,
ciliumNodes: p.Nodes,
endpoints: p.Endpoints,
}

t, err := trigger.NewTrigger(trigger.Parameters{
Expand Down Expand Up @@ -288,8 +267,8 @@ func newEgressGatewayManager(p Params) (*Manager, error) {
return fmt.Errorf("egress gateway needs kernel 5.2 or newer")
}

go manager.processEvents(ctx, p.CacheStatus)
manager.processCiliumEndpoints(ctx, &wg)
go manager.processEvents(ctx)

return nil
},
OnStop: func(hc hive.HookContext) error {
Expand Down Expand Up @@ -337,10 +316,10 @@ func (manager *Manager) getIdentityLabels(securityIdentity uint32) (labels.Label

// processEvents spawns a goroutine that waits for the agent to
// sync with k8s and then runs the first reconciliation.
func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.CacheStatus) {
var globalSync, policySync, nodeSync bool
func (manager *Manager) processEvents(ctx context.Context) {
var policySync, nodeSync, endpointSync bool
maybeTriggerReconcile := func() {
if !globalSync || !policySync || !nodeSync {
if !policySync || !nodeSync || !endpointSync {
return
}

Expand All @@ -356,18 +335,21 @@ func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.Cache
manager.reconciliationTrigger.TriggerWithReason("k8s sync done")
}

// here we try to mimic the same exponential backoff retry logic used by
// the identity allocator, where the minimum retry timeout is set to 20
// milliseconds and the max number of attempts is 16 (so 20ms * 2^16 ==
// ~20 minutes)
endpointsRateLimit := workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond*20, time.Minute*20)

policyEvents := manager.policies.Events(ctx)
nodeEvents := manager.ciliumNodes.Events(ctx)
endpointEvents := manager.endpoints.Events(ctx, resource.WithRateLimiter(endpointsRateLimit))

for {
select {
case <-ctx.Done():
return

case <-cacheStatus:
globalSync = true
maybeTriggerReconcile()
cacheStatus = nil

case event := <-policyEvents:
if event.Kind == resource.Sync {
policySync = true
Expand All @@ -385,6 +367,15 @@ func (manager *Manager) processEvents(ctx context.Context, cacheStatus k8s.Cache
} else {
manager.handleNodeEvent(event)
}

case event := <-endpointEvents:
if event.Kind == resource.Sync {
endpointSync = true
maybeTriggerReconcile()
event.Done(nil)
} else {
manager.handleEndpointEvent(event)
}
}
}
}
Expand All @@ -400,58 +391,6 @@ func (manager *Manager) handlePolicyEvent(event resource.Event[*Policy]) {
}
}

// processCiliumEndpoints spawns a goroutine that:
// - consumes the endpoint IDs returned by the endpointEventsQueue workqueue
// - processes the CiliumEndpoints stored in pendingEndpointEvents for these
// endpoint IDs
// - in case the endpoint ID -> labels resolution fails, it adds back the
// event to the workqueue so that it can be retried with an exponential
// backoff
func (manager *Manager) processCiliumEndpoints(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)

go func() {
defer wg.Done()

retryQueue := manager.endpointEventsQueue
go func() {
<-ctx.Done()
retryQueue.ShutDown()
}()

for {
item, shutdown := retryQueue.Get()
if shutdown {
break
}
endpointID := item.(types.NamespacedName)

manager.pendingEndpointEventsLock.RLock()
ep, ok := manager.pendingEndpointEvents[endpointID]
manager.pendingEndpointEventsLock.RUnlock()

var err error
if ok {
err = manager.addEndpoint(ep)
} else {
manager.deleteEndpoint(endpointID)
}

if err != nil {
// if the endpoint event is still pending it means the manager
// failed to resolve the endpoint ID to a set of labels, so add back
// the item to the queue
manager.endpointEventsQueue.AddRateLimited(endpointID)
} else {
// otherwise just remove it
manager.endpointEventsQueue.Forget(endpointID)
}

manager.endpointEventsQueue.Done(endpointID)
}
}()
}

// Event handlers

// onAddEgressPolicy parses the given policy config, and updates internal state
Expand Down Expand Up @@ -560,32 +499,20 @@ func (manager *Manager) deleteEndpoint(id types.NamespacedName) {
manager.reconciliationTrigger.TriggerWithReason("endpoint deleted")
}

// OnUpdateEndpoint is the event handler for endpoint additions and updates.
func (manager *Manager) OnUpdateEndpoint(endpoint *k8sTypes.CiliumEndpoint) {
id := types.NamespacedName{
Name: endpoint.GetName(),
Namespace: endpoint.GetNamespace(),
}

manager.pendingEndpointEventsLock.Lock()
manager.pendingEndpointEvents[id] = endpoint
manager.pendingEndpointEventsLock.Unlock()

manager.endpointEventsQueue.Add(id)
}
func (manager *Manager) handleEndpointEvent(event resource.Event[*k8sTypes.CiliumEndpoint]) {
endpoint := event.Object

// OnDeleteEndpoint is the event handler for endpoint deletions.
func (manager *Manager) OnDeleteEndpoint(endpoint *k8sTypes.CiliumEndpoint) {
id := types.NamespacedName{
Name: endpoint.GetName(),
Namespace: endpoint.GetNamespace(),
}

manager.pendingEndpointEventsLock.Lock()
delete(manager.pendingEndpointEvents, id)
manager.pendingEndpointEventsLock.Unlock()

manager.endpointEventsQueue.Add(id)
if event.Kind == resource.Upsert {
event.Done(manager.addEndpoint(endpoint))
} else {
manager.deleteEndpoint(id)
event.Done(nil)
}
}

// handleNodeEvent takes care of node upserts and removals.
Expand Down
Loading

0 comments on commit b2cb840

Please sign in to comment.