Skip to content

Commit

Permalink
adopt context.Context in cloud provider
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinForReal authored and k8s-infra-cherrypick-robot committed Sep 27, 2024
1 parent b7384c0 commit a8fc6ca
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 69 deletions.
29 changes: 13 additions & 16 deletions cmd/cloud-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func NewCloudControllerManagerCommand() *cobra.Command {
}
}

healthHandler, err := StartHTTPServer(c.Complete(), traceProvider, wait.NeverStop)
healthHandler, err := StartHTTPServer(cmd.Context(), c.Complete(), traceProvider)
if err != nil {
klog.Errorf("Run: railed to start HTTP server: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -212,15 +212,13 @@ func NewCloudControllerManagerCommand() *cobra.Command {

// RunWrapper adapts the ccm boot logic to the leader elector call back function
func RunWrapper(s *options.CloudControllerManagerOptions, c *cloudcontrollerconfig.Config, h *controllerhealthz.MutableHealthzHandler) func(ctx context.Context) {
return func(_ context.Context) {
return func(ctx context.Context) {
if !c.DynamicReloadingConfig.EnableDynamicReloading {
klog.V(1).Infof("using static initialization from config file %s", c.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile)
if err := Run(context.TODO(), c.Complete(), h); err != nil {
if err := Run(ctx, c.Complete(), h); err != nil {
klog.Errorf("RunWrapper: failed to start cloud controller manager: %v", err)
os.Exit(1)
}

panic("unreachable")
}
var updateCh chan struct{}

Expand Down Expand Up @@ -311,7 +309,7 @@ func runAsync(s *options.CloudControllerManagerOptions, errCh chan error, h *con
}

// StartHTTPServer starts the controller manager HTTP server
func StartHTTPServer(c *cloudcontrollerconfig.CompletedConfig, traceProvider *trace.Provider, stopCh <-chan struct{}) (*controllerhealthz.MutableHealthzHandler, error) {
func StartHTTPServer(ctx context.Context, c *cloudcontrollerconfig.CompletedConfig, traceProvider *trace.Provider) (*controllerhealthz.MutableHealthzHandler, error) {
// Setup any healthz checks we will want to use.
var checks []healthz.HealthChecker
var electionChecker *leaderelection.HealthzAdaptor
Expand All @@ -329,7 +327,7 @@ func StartHTTPServer(c *cloudcontrollerconfig.CompletedConfig, traceProvider *tr

handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
if _, _, err := c.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return nil, err
}

Expand Down Expand Up @@ -387,18 +385,18 @@ func Run(ctx context.Context, c *cloudcontrollerconfig.CompletedConfig, h *contr
klog.Fatalf("error building controller context: %v", err)
}

if err := startControllers(ctx, controllerContext, c, ctx.Done(), cloud, newControllerInitializers(), h); err != nil {
if err := startControllers(ctx, controllerContext, c, cloud, newControllerInitializers(), h); err != nil {
klog.Fatalf("error running controllers: %v", err)
}

return nil
}

// startControllers starts the cloud specific controller loops.
func startControllers(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{},
func startControllers(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig,
cloud cloudprovider.Interface, controllers map[string]initFunc, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
// Initialize the cloud provider with a reference to the clientBuilder
cloud.Initialize(completedConfig.ClientBuilder, stopCh)
cloud.Initialize(completedConfig.ClientBuilder, ctx.Done())
// Set the informer on the user cloud object
if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
informerUserCloud.SetInformers(completedConfig.SharedInformers)
Expand All @@ -412,7 +410,7 @@ func startControllers(ctx context.Context, controllerContext genericcontrollerma
}

klog.V(1).Infof("Starting %q", controllerName)
ctrl, started, err := initFn(ctx, controllerContext, completedConfig, cloud, stopCh)
ctrl, started, err := initFn(ctx, controllerContext, completedConfig, cloud)
if err != nil {
klog.Errorf("Error starting %q: %s", controllerName, err.Error())
return err
Expand Down Expand Up @@ -445,10 +443,9 @@ func startControllers(ctx context.Context, controllerContext genericcontrollerma
}

klog.V(2).Infof("startControllers: starting shared informers")
completedConfig.SharedInformers.Start(stopCh)
controllerContext.InformerFactory.Start(stopCh)

<-stopCh
completedConfig.SharedInformers.Start(ctx.Done())
controllerContext.InformerFactory.Start(ctx.Done())
<-ctx.Done()
klog.V(1).Infof("startControllers: received stopping signal, exiting")

return nil
Expand All @@ -457,7 +454,7 @@ func startControllers(ctx context.Context, controllerContext genericcontrollerma
// initFunc is used to launch a particular controller. It may run additional "should I activate checks".
// Any error returned will cause the controller process to `Fatal`
// The bool indicates whether the controller was enabled.
type initFunc func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stop <-chan struct{}) (debuggingHandler http.Handler, enabled bool, err error)
type initFunc func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (debuggingHandler http.Handler, enabled bool, err error)

// KnownControllers indicate the default controller we are known.
func KnownControllers() []string {
Expand Down
16 changes: 8 additions & 8 deletions cmd/cloud-controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"sigs.k8s.io/cloud-provider-azure/pkg/nodeipam/ipam"
)

func startCloudNodeController(_ context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startCloudNodeController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (http.Handler, bool, error) {
// Start the CloudNodeController
nodeController, err := nodecontroller.NewCloudNodeController(
completedConfig.SharedInformers.Core().V1().Nodes(),
Expand All @@ -58,12 +58,12 @@ func startCloudNodeController(_ context.Context, controllerContext genericcontro
return nil, false, nil
}

go nodeController.Run(stopCh, controllerContext.ControllerManagerMetrics)
go nodeController.RunWithContext(ctx, controllerContext.ControllerManagerMetrics)

Check failure on line 61 in cmd/cloud-controller-manager/app/core.go

View workflow job for this annotation

GitHub Actions / Lint

nodeController.RunWithContext undefined (type *"k8s.io/cloud-provider/controllers/node".CloudNodeController has no field or method RunWithContext)) (typecheck)

Check failure on line 61 in cmd/cloud-controller-manager/app/core.go

View workflow job for this annotation

GitHub Actions / Lint

nodeController.RunWithContext undefined (type *"k8s.io/cloud-provider/controllers/node".CloudNodeController has no field or method RunWithContext)) (typecheck)

Check failure on line 61 in cmd/cloud-controller-manager/app/core.go

View workflow job for this annotation

GitHub Actions / Lint

nodeController.RunWithContext undefined (type *"k8s.io/cloud-provider/controllers/node".CloudNodeController has no field or method RunWithContext) (typecheck)

return nil, true, nil
}

func startCloudNodeLifecycleController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, _ <-chan struct{}) (http.Handler, bool, error) {
func startCloudNodeLifecycleController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (http.Handler, bool, error) {
// Start the cloudNodeLifecycleController
cloudNodeLifecycleController, err := nodelifecyclecontroller.NewCloudNodeLifecycleController(
completedConfig.SharedInformers.Core().V1().Nodes(),
Expand All @@ -74,15 +74,15 @@ func startCloudNodeLifecycleController(ctx context.Context, controllerContext ge
)
if err != nil {
klog.Warningf("failed to start cloud node lifecycle controller: %s", err)
return nil, false, nil
return nil, false, err
}

go cloudNodeLifecycleController.Run(ctx, controllerContext.ControllerManagerMetrics)

return nil, true, nil
}

func startServiceController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, _ <-chan struct{}) (http.Handler, bool, error) {
func startServiceController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (http.Handler, bool, error) {
// Start the service controller
serviceController, err := servicecontroller.New(
cloud,
Expand All @@ -103,7 +103,7 @@ func startServiceController(ctx context.Context, controllerContext genericcontro
return nil, true, nil
}

func startRouteController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, _ <-chan struct{}) (http.Handler, bool, error) {
func startRouteController(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (http.Handler, bool, error) {
if !completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes {
klog.Infof("Will not configure cloud provider routes, --configure-cloud-routes: %v.", completedConfig.ComponentConfig.KubeCloudShared.ConfigureCloudRoutes)
return nil, false, nil
Expand Down Expand Up @@ -144,7 +144,7 @@ func startRouteController(ctx context.Context, controllerContext genericcontroll
return nil, true, nil
}

func startNodeIpamController(_ context.Context, _ genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
func startNodeIpamController(ctx context.Context, _ genericcontrollermanager.ControllerContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (http.Handler, bool, error) {
var serviceCIDR *net.IPNet
var secondaryServiceCIDR *net.IPNet

Expand Down Expand Up @@ -218,7 +218,7 @@ func startNodeIpamController(_ context.Context, _ genericcontrollermanager.Contr
if err != nil {
return nil, true, err
}
go nodeIpamController.Run(stopCh)
go nodeIpamController.Run(ctx)
return nil, true, nil
}

Expand Down
29 changes: 12 additions & 17 deletions cmd/cloud-node-manager/app/nodemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package app
import (
"context"
"fmt"
"os"
"time"

"github.com/spf13/cobra"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/cli/globalflag"
Expand Down Expand Up @@ -52,26 +50,23 @@ func NewCloudNodeManagerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "cloud-node-manager",
Long: `The Cloud node manager is a daemon that reconciles node information for its running node.`,
Run: func(cmd *cobra.Command, _ []string) {
RunE: func(cmd *cobra.Command, _ []string) error {
verflag.PrintAndExitIfRequested("Cloud Node Manager")
cliflag.PrintFlags(cmd.Flags())

c, err := s.Config()
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
return err
}

if err := initForOS(c.WindowsService); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
return err
}

if err := Run(c, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
if err := Run(cmd.Context(), c); err != nil {
return err
}

return nil
},
}

Expand All @@ -98,7 +93,7 @@ func NewCloudNodeManagerCommand() *cobra.Command {
}

// Run runs the ExternalCMServer. This should never exit.
func Run(c *cloudnodeconfig.Config, stopCh <-chan struct{}) error {
func Run(ctx context.Context, c *cloudnodeconfig.Config) error {
// To help debugging, immediately log version and nodeName
klog.Infof("Version: %+v", version.Get())
klog.Infof("NodeName: %s", c.NodeName)
Expand All @@ -110,13 +105,13 @@ func Run(c *cloudnodeconfig.Config, stopCh <-chan struct{}) error {
unsecuredMux := genericcontrollermanager.NewBaseHandler(&config.DebuggingConfiguration{}, healthzHandler)
handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
if _, _, err := c.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return err
}
}

run := func(ctx context.Context) {
if err := startControllers(ctx, c, ctx.Done(), healthzHandler); err != nil {
if err := startControllers(ctx, c, healthzHandler); err != nil {
klog.Fatalf("error running controllers: %v", err)
}
}
Expand All @@ -126,7 +121,7 @@ func Run(c *cloudnodeconfig.Config, stopCh <-chan struct{}) error {
}

// startControllers starts the cloud specific controller loops.
func startControllers(ctx context.Context, c *cloudnodeconfig.Config, stopCh <-chan struct{}, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
func startControllers(ctx context.Context, c *cloudnodeconfig.Config, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
klog.V(1).Infof("Starting cloud-node-manager...")

// Start the CloudNodeController
Expand All @@ -140,7 +135,7 @@ func startControllers(ctx context.Context, c *cloudnodeconfig.Config, stopCh <-c
c.WaitForRoutes,
c.EnableDeprecatedBetaTopologyLabels)

go nodeController.Run(stopCh)
go nodeController.Run(ctx)

check := controllerhealthz.NamedPingChecker(c.NodeName)
healthzHandler.AddHealthChecker(check)
Expand All @@ -153,7 +148,7 @@ func startControllers(ctx context.Context, c *cloudnodeconfig.Config, stopCh <-c
klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
}

c.SharedInformers.Start(stopCh)
c.SharedInformers.Start(ctx.Done())

select {}
}
2 changes: 1 addition & 1 deletion pkg/nodeipam/ipam/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type CIDRAllocator interface {
// ReleaseCIDR releases the CIDR of the removed node
ReleaseCIDR(node *corev1.Node) error
// Run starts all the working logic of the allocator.
Run(stopCh <-chan struct{})
Run(ctx context.Context)
}

// CIDRAllocatorParams is parameters that's required for creating new
Expand Down
13 changes: 7 additions & 6 deletions pkg/nodeipam/ipam/cloud_cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package ipam

import (
"context"
"fmt"
"net"
"sync"
Expand Down Expand Up @@ -262,24 +263,24 @@ func (ca *cloudCIDRAllocator) updateNodeSubnetMaskSizes(nodeName, providerID str
return nil
}

func (ca *cloudCIDRAllocator) Run(stopCh <-chan struct{}) {
func (ca *cloudCIDRAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

klog.Infof("Starting cloud CIDR allocator")
defer klog.Infof("Shutting down cloud CIDR allocator")

if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, ca.nodesSynced) {
if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), ca.nodesSynced) {
return
}

for i := 0; i < cidrUpdateWorkers; i++ {
go ca.worker(stopCh)
go ca.worker(ctx)
}

<-stopCh
<-ctx.Done()
}

func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
func (ca *cloudCIDRAllocator) worker(ctx context.Context) {
for {
select {
case workItem, ok := <-ca.nodeUpdateChannel:
Expand All @@ -291,7 +292,7 @@ func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
// Requeue the failed node for update again.
ca.nodeUpdateChannel <- workItem
}
case <-stopChan:
case <-ctx.Done():
return
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/nodeipam/ipam/cloud_cidr_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package ipam

import (
"context"
"fmt"
"net"
"testing"
Expand Down Expand Up @@ -50,7 +51,6 @@ func hasNodeInProcessing(ca *cloudCIDRAllocator, name string) bool {
func TestBoundedRetries(_ *testing.T) {
clientSet := fake.NewSimpleClientset()
updateChan := make(chan nodeReservedCIDRs, 1) // need to buffer as we are using only on go routine
stopChan := make(chan struct{})
sharedInfomer := informers.NewSharedInformerFactory(clientSet, 1*time.Hour)
ca := &cloudCIDRAllocator{
client: clientSet,
Expand All @@ -59,7 +59,7 @@ func TestBoundedRetries(_ *testing.T) {
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
nodesInProcessing: map[string]struct{}{},
}
go ca.worker(stopChan)
go ca.worker(context.Background())
nodeName := "testNode"
_ = ca.AllocateOrOccupyCIDR(&v1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down
13 changes: 7 additions & 6 deletions pkg/nodeipam/ipam/range_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package ipam

import (
"context"
"fmt"
"net"
"strings"
Expand Down Expand Up @@ -171,24 +172,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
return ra, nil
}

func (r *rangeAllocator) Run(stopCh <-chan struct{}) {
func (r *rangeAllocator) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

klog.Infof("Starting range CIDR allocator")
defer klog.Infof("Shutting down range CIDR allocator")

if !cache.WaitForNamedCacheSync("cidrallocator", stopCh, r.nodesSynced) {
if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), r.nodesSynced) {
return
}

for i := 0; i < cidrUpdateWorkers; i++ {
go r.worker(stopCh)
go r.worker(ctx)
}

<-stopCh
<-ctx.Done()
}

func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
func (r *rangeAllocator) worker(ctx context.Context) {
for {
select {
case workItem, ok := <-r.nodeCIDRUpdateChannel:
Expand All @@ -200,7 +201,7 @@ func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
// Requeue the failed node for update again.
r.nodeCIDRUpdateChannel <- workItem
}
case <-stopChan:
case <-ctx.Done():
return
}
}
Expand Down
Loading

0 comments on commit a8fc6ca

Please sign in to comment.