Skip to content

Commit

Permalink
Merge pull request #2984 from zhanggbj/remove_keep_alive_handler
Browse files Browse the repository at this point in the history
🌱 Remove keep alive feature
  • Loading branch information
k8s-ci-robot authored May 14, 2024
2 parents 50976f1 + a696b5b commit e351bc2
Show file tree
Hide file tree
Showing 18 changed files with 43 additions and 266 deletions.
6 changes: 1 addition & 5 deletions controllers/vspherecluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,7 @@ func (r *clusterReconciler) reconcileIdentitySecret(ctx context.Context, cluster
func (r *clusterReconciler) reconcileVCenterConnectivity(ctx context.Context, clusterCtx *capvcontext.ClusterContext) (*session.Session, error) {
params := session.NewParams().
WithServer(clusterCtx.VSphereCluster.Spec.Server).
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint).
WithFeatures(session.Feature{
EnableKeepAlive: r.ControllerManagerContext.EnableKeepAlive,
KeepAliveDuration: r.ControllerManagerContext.KeepAliveDuration,
})
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint)

if clusterCtx.VSphereCluster.Spec.IdentityRef != nil {
creds, err := identity.GetCredentials(ctx, r.Client, clusterCtx.VSphereCluster, r.ControllerManagerContext.Namespace)
Expand Down
6 changes: 1 addition & 5 deletions controllers/vspheredeploymentzone_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,7 @@ func (r vsphereDeploymentZoneReconciler) getVCenterSession(ctx context.Context,
params := session.NewParams().
WithServer(deploymentZoneCtx.VSphereDeploymentZone.Spec.Server).
WithDatacenter(datacenter).
WithUserInfo(r.ControllerManagerContext.Username, r.ControllerManagerContext.Password).
WithFeatures(session.Feature{
EnableKeepAlive: r.EnableKeepAlive,
KeepAliveDuration: r.KeepAliveDuration,
})
WithUserInfo(r.ControllerManagerContext.Username, r.ControllerManagerContext.Password)

clusterList := &infrav1.VSphereClusterList{}
if err := r.Client.List(ctx, clusterList); err != nil {
Expand Down
7 changes: 2 additions & 5 deletions controllers/vspherevm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,11 +587,8 @@ func (r vmReconciler) retrieveVcenterSession(ctx context.Context, vsphereVM *inf
WithServer(vsphereVM.Spec.Server).
WithDatacenter(vsphereVM.Spec.Datacenter).
WithUserInfo(r.ControllerManagerContext.Username, r.ControllerManagerContext.Password).
WithThumbprint(vsphereVM.Spec.Thumbprint).
WithFeatures(session.Feature{
EnableKeepAlive: r.ControllerManagerContext.EnableKeepAlive,
KeepAliveDuration: r.ControllerManagerContext.KeepAliveDuration,
})
WithThumbprint(vsphereVM.Spec.Thumbprint)

cluster, err := clusterutilv1.GetClusterFromMetadata(ctx, r.Client, vsphereVM.ObjectMeta)
if err != nil {
log.V(4).Info("Using credentials provided to the manager to create the authenticated session, VSphereVM is missing cluster label or cluster does not exist")
Expand Down
29 changes: 6 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
"sigs.k8s.io/cluster-api-provider-vsphere/controllers"
"sigs.k8s.io/cluster-api-provider-vsphere/feature"
"sigs.k8s.io/cluster-api-provider-vsphere/internal/webhooks"
"sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants"
capvcontext "sigs.k8s.io/cluster-api-provider-vsphere/pkg/context"
"sigs.k8s.io/cluster-api-provider-vsphere/pkg/manager"
"sigs.k8s.io/cluster-api-provider-vsphere/pkg/session"
Expand Down Expand Up @@ -88,13 +87,11 @@ var (
tlsOptions = capiflags.TLSOptions{}
diagnosticsOptions = capiflags.DiagnosticsOptions{}

defaultProfilerAddr = os.Getenv("PROFILER_ADDR")
defaultSyncPeriod = manager.DefaultSyncPeriod
defaultLeaderElectionID = manager.DefaultLeaderElectionID
defaultPodName = manager.DefaultPodName
defaultWebhookPort = manager.DefaultWebhookServiceContainerPort
defaultEnableKeepAlive = constants.DefaultEnableKeepAlive
defaultKeepAliveDuration = constants.DefaultKeepAliveDuration
defaultProfilerAddr = os.Getenv("PROFILER_ADDR")
defaultSyncPeriod = manager.DefaultSyncPeriod
defaultLeaderElectionID = manager.DefaultLeaderElectionID
defaultPodName = manager.DefaultPodName
defaultWebhookPort = manager.DefaultWebhookServiceContainerPort
)

// InitFlags initializes the flags.
Expand Down Expand Up @@ -143,21 +140,7 @@ func InitFlags(fs *pflag.FlagSet) {
"/etc/capv/credentials.yaml",
"path to CAPV's credentials file",
)
fs.BoolVar(
&managerOpts.EnableKeepAlive,
"enable-keep-alive",
defaultEnableKeepAlive,
"feature to enable keep alive handler in vsphere sessions. This functionality is disabled by default.")
_ = fs.MarkDeprecated("enable-keep-alive", "This flag has been deprecated and will be removed in a "+
"future release. Note: This feature has been disabled per default because we determined that we already keep alive "+
"sessions just by our regular reconciles. So we don't need an additional keep alive handler. Enabling "+
"this feature may lead to a deadlock in controllers communicating with vCenter.")
fs.DurationVar(
&managerOpts.KeepAliveDuration,
"keep-alive-duration",
defaultKeepAliveDuration,
"idle time interval(minutes) in between send() requests in keepalive handler",
)

fs.StringVar(
&managerOpts.NetworkProvider,
"network-provider",
Expand Down
6 changes: 1 addition & 5 deletions pkg/clustermodule/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ func (s *service) fetchSessionForObject(ctx context.Context, clusterCtx *capvcon
func (s *service) newParams(clusterCtx capvcontext.ClusterContext) *session.Params {
return session.NewParams().
WithServer(clusterCtx.VSphereCluster.Spec.Server).
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint).
WithFeatures(session.Feature{
EnableKeepAlive: s.ControllerManagerContext.EnableKeepAlive,
KeepAliveDuration: s.ControllerManagerContext.KeepAliveDuration,
})
WithThumbprint(clusterCtx.VSphereCluster.Spec.Thumbprint)
}

func (s *service) fetchSession(ctx context.Context, clusterCtx *capvcontext.ClusterContext, params *session.Params) (*session.Session, error) {
Expand Down
8 changes: 0 additions & 8 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ limitations under the License.
package constants

import (
"time"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
)

Expand Down Expand Up @@ -52,12 +50,6 @@ const (
// cluster are in maintenance mode.
MaintenanceAnnotationLabel = "capv." + infrav1.GroupName + "/maintenance"

// DefaultEnableKeepAlive is false by default.
DefaultEnableKeepAlive = false

// DefaultKeepAliveDuration unit minutes.
DefaultKeepAliveDuration = time.Minute * 5

// NodeLabelPrefix is the prefix for node labels.
NodeLabelPrefix = "node.cluster.x-k8s.io"

Expand Down
9 changes: 0 additions & 9 deletions pkg/context/controller_manager_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package context

import (
"sync"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -67,14 +66,6 @@ type ControllerManagerContext struct {
// endpoints.
Password string

// EnableKeepAlive is a session feature to enable keep alive handler
// for better load management on vSphere api server
EnableKeepAlive bool

// KeepAliveDuration is the idle time interval in between send() requests
// in keepalive handler
KeepAliveDuration time.Duration

// NetworkProvider is the network provider used by Supervisor based clusters
NetworkProvider string

Expand Down
2 changes: 0 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func New(ctx context.Context, opts Options) (Manager, error) {
Scheme: opts.Scheme,
Username: opts.Username,
Password: opts.Password,
EnableKeepAlive: opts.EnableKeepAlive,
KeepAliveDuration: opts.KeepAliveDuration,
NetworkProvider: opts.NetworkProvider,
WatchFilterValue: opts.WatchFilterValue,
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/manager/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"os"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
Expand All @@ -41,10 +40,6 @@ type AddToManagerFunc func(context.Context, *capvcontext.ControllerManagerContex
type Options struct {
ctrlmgr.Options

// EnableKeepAlive is a session feature to enable keep alive handler
// for better load management on vSphere api server
EnableKeepAlive bool

// PodNamespace is the namespace in which the pod running the controller
// maintains a leader election lock.
//
Expand All @@ -64,10 +59,6 @@ type Options struct {
// endpoints.
Password string

// KeepAliveDuration is the idle time interval in between send() requests
// in keepalive handler
KeepAliveDuration time.Duration

// CredentialsFile is the file that contains credentials of CAPV
CredentialsFile string

Expand Down
58 changes: 6 additions & 52 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,20 @@ import (
"net/netip"
"net/url"
"sync"
"time"

"github.com/blang/semver"
"github.com/pkg/errors"
"github.com/vmware/govmomi"
"github.com/vmware/govmomi/find"
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/session"
"github.com/vmware/govmomi/session/keepalive"
"github.com/vmware/govmomi/vapi/rest"
"github.com/vmware/govmomi/vapi/tags"
"github.com/vmware/govmomi/vim25"
"github.com/vmware/govmomi/vim25/methods"
"github.com/vmware/govmomi/vim25/soap"
ctrl "sigs.k8s.io/controller-runtime"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
"sigs.k8s.io/cluster-api-provider-vsphere/pkg/constants"
)

var (
Expand All @@ -62,16 +58,11 @@ type Session struct {
}

// Feature is a set of Features of the session.
type Feature struct {
EnableKeepAlive bool
KeepAliveDuration time.Duration
}
type Feature struct{}

// DefaultFeature sets the default values for features.
func DefaultFeature() Feature {
return Feature{
EnableKeepAlive: constants.DefaultEnableKeepAlive,
}
return Feature{}
}

// Params are the parameters of a VCenter session.
Expand Down Expand Up @@ -191,7 +182,7 @@ func GetOrCreate(ctx context.Context, params *Params) (*Session, error) {
}

soapURL.User = params.userinfo
client, err := newClient(ctx, sessionKey, soapURL, params.thumbprint, params.feature)
client, err := newClient(ctx, soapURL, params.thumbprint, params.feature)
if err != nil {
return nil, errors.Wrapf(err, "failed to create vCenter session")
}
Expand All @@ -202,7 +193,7 @@ func GetOrCreate(ctx context.Context, params *Params) (*Session, error) {
// Assign the finder to the session.
session.Finder = find.NewFinder(session.Client.Client, false)
// Assign tag manager to the session.
manager, err := newManager(ctx, sessionKey, client.Client, soapURL.User, params.feature)
manager, err := newManager(ctx, client.Client, soapURL.User, params.feature)
if err != nil {
log.Error(err, "Failed to create tags manager, will logout")
// Logout of previously logged session to not leak
Expand Down Expand Up @@ -238,9 +229,7 @@ func GetOrCreate(ctx context.Context, params *Params) (*Session, error) {
return &session, nil
}

func newClient(ctx context.Context, sessionKey string, url *url.URL, thumbprint string, feature Feature) (*govmomi.Client, error) {
log := ctrl.LoggerFrom(ctx)

func newClient(ctx context.Context, url *url.URL, thumbprint string, _ Feature) (*govmomi.Client, error) {
insecure := thumbprint == ""
soapClient := soap.NewClient(url, insecure)
if !insecure {
Expand All @@ -258,20 +247,6 @@ func newClient(ctx context.Context, sessionKey string, url *url.URL, thumbprint
SessionManager: session.NewManager(vimClient),
}

if feature.EnableKeepAlive {
vimClient.RoundTripper = session.KeepAliveHandler(vimClient.RoundTripper, feature.KeepAliveDuration, func(tripper soap.RoundTripper) error {
_, err := methods.GetCurrentTime(ctx, tripper)
if err != nil {
log.Error(err, "Failed to keep alive govmomi client, Clearing the session now")
if errLogout := c.Logout(ctx); errLogout != nil {
log.Error(err, "Failed to logout keepalive failed session")
}
sessionCache.Delete(sessionKey)
}
return err
})
}

if err := c.Login(ctx, url.User); err != nil {
return nil, errors.Wrapf(err, "failed to create client: failed to login")
}
Expand All @@ -280,29 +255,8 @@ func newClient(ctx context.Context, sessionKey string, url *url.URL, thumbprint
}

// newManager creates a Manager that encompasses the REST Client for the VSphere tagging API.
func newManager(ctx context.Context, sessionKey string, client *vim25.Client, user *url.Userinfo, feature Feature) (*tags.Manager, error) {
log := ctrl.LoggerFrom(ctx)

func newManager(ctx context.Context, client *vim25.Client, user *url.Userinfo, _ Feature) (*tags.Manager, error) {
rc := rest.NewClient(client)
if feature.EnableKeepAlive {
rc.Transport = keepalive.NewHandlerREST(rc, feature.KeepAliveDuration, func() error {
s, err := rc.Session(ctx)
if s != nil && err == nil {
return nil
}

if err != nil {
log.Error(err, "Failed to keep alive REST client")
}

log.Info("REST client session expired, clearing session")
if errLogout := rc.Logout(ctx); errLogout != nil {
log.Error(err, "Failed to logout keepalive failed REST session")
}
sessionCache.Delete(sessionKey)
return errors.New("REST client session expired")
})
}
if err := rc.Login(ctx, user); err != nil {
return nil, errors.Wrapf(err, "failed to create tags manager: failed to login REST client")
}
Expand Down
60 changes: 0 additions & 60 deletions pkg/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,63 +112,3 @@ func assertSessionCountEqualTo(g *WithT, simr *vcsim.Simulator, count int) {
return sessions == count
}, 30*time.Second).Should(BeTrue())
}

func TestGetSessionWithKeepAlive(t *testing.T) {
g := NewWithT(t)
ctrl.SetLogger(klog.Background())

model := simulator.VPX()
model.Cluster = 2

simr, err := vcsim.NewBuilder().
WithModel(model).Build()
if err != nil {
t.Fatalf("failed to create VC simulator")
}
defer simr.Destroy()

params := NewParams().
WithServer(simr.ServerURL().Host).
WithUserInfo(simr.Username(), simr.Password()).
WithDatacenter("*")

// Get first Session
ctx := context.Background()
s, err := GetOrCreate(ctx, params)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(s).ToNot(BeNil())
assertSessionCountEqualTo(g, simr, 1)

// Get session key
sessionInfo, err := s.SessionManager.UserSession(ctx)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(sessionInfo).ToNot(BeNil())
firstSession := sessionInfo.Key

// Get the session again
// as keep alive is enabled and session is
// not expired we must get the same cached session
s, err = GetOrCreate(ctx, params)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(s).ToNot(BeNil())
sessionInfo, err = s.SessionManager.UserSession(ctx)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(sessionInfo).ToNot(BeNil())
g.Expect(sessionInfo.Key).To(BeEquivalentTo(firstSession))
assertSessionCountEqualTo(g, simr, 1)

// Try to remove vim session
g.Expect(s.Logout(ctx)).To(Succeed())

// after logging out old session must be deleted,
// we must get a new different session
// total session count must remain 1
s, err = GetOrCreate(ctx, params)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(s).ToNot(BeNil())
sessionInfo, err = s.SessionManager.UserSession(ctx)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(sessionInfo).ToNot(BeNil())
g.Expect(sessionInfo.Key).ToNot(BeEquivalentTo(firstSession))
assertSessionCountEqualTo(g, simr, 1)
}
Loading

0 comments on commit e351bc2

Please sign in to comment.