diff --git a/Makefile b/Makefile index 1eb0554f37..cdad471414 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,7 @@ GINKGO_NODES ?= 1 GINKGO_TIMEOUT ?= 3h E2E_CONF_FILE ?= $(abspath test/e2e/config/vsphere.yaml) E2E_CONF_OVERRIDE_FILE ?= $(abspath test/e2e/config/config-overrides.yaml) -E2E_IPAM_KUBECONFIG ?= +E2E_VSPHERE_IP_POOL ?= E2E_TEMPLATE_DIR := $(abspath test/e2e/data/) E2E_GOVMOMI_TEMPLATE_DIR := $(E2E_TEMPLATE_DIR)/infrastructure-vsphere-govmomi E2E_SUPERVISOR_TEMPLATE_DIR := $(E2E_TEMPLATE_DIR)/infrastructure-vsphere-supervisor @@ -193,6 +193,9 @@ IMPORT_BOSS_PKG := k8s.io/code-generator/cmd/import-boss CAPI_HACK_TOOLS_VER := ef04465b2ba76214eea570e27e8146c96412e32a # Note: this is the commit ID of CAPI v1.7.1 +BOSKOSCTL_BIN := boskosctl +BOSKOSCTL := $(abspath $(TOOLS_BIN_DIR)/$(BOSKOSCTL_BIN)) + CONVERSION_VERIFIER_VER := $(CAPI_HACK_TOOLS_VER) CONVERSION_VERIFIER_BIN := conversion-verifier CONVERSION_VERIFIER := $(abspath $(TOOLS_BIN_DIR)/$(CONVERSION_VERIFIER_BIN)-$(CONVERSION_VERIFIER_VER)) @@ -627,7 +630,7 @@ e2e: $(GINKGO) $(KUSTOMIZE) $(KIND) $(GOVC) ## Run e2e tests --e2e.artifacts-folder="$(ARTIFACTS)" \ --e2e.skip-resource-cleanup=$(SKIP_RESOURCE_CLEANUP) \ --e2e.use-existing-cluster="$(USE_EXISTING_CLUSTER)" \ - --e2e.ipam-kubeconfig="$(E2E_IPAM_KUBECONFIG)" + --e2e.ip-pool='$(E2E_VSPHERE_IP_POOL)' ## -------------------------------------- ## Release @@ -958,6 +961,9 @@ $(CONVERSION_GEN_BIN): $(CONVERSION_GEN) ## Build a local copy of conversion-gen .PHONY: $(PROWJOB_GEN_BIN) $(PROWJOB_GEN_BIN): $(PROWJOB_GEN) ## Build a local copy of prowjob-gen. +.PHONY: $(BOSKOSCTL_BIN) +$(BOSKOSCTL_BIN): $(BOSKOSCTL) ## Build a local copy of boskosctl. + .PHONY: $(CONVERSION_VERIFIER_BIN) $(CONVERSION_VERIFIER_BIN): $(CONVERSION_VERIFIER) ## Build a local copy of conversion-verifier. @@ -1012,6 +1018,9 @@ $(CONTROLLER_GEN): # Build controller-gen. $(CONVERSION_GEN): # Build conversion-gen. GOBIN=$(TOOLS_BIN_DIR) $(GO_INSTALL) $(CONVERSION_GEN_PKG) $(CONVERSION_GEN_BIN) $(CONVERSION_GEN_VER) +$(BOSKOSCTL): # Build boskosctl from tools folder. + go build -o $(TOOLS_BIN_DIR)/$(BOSKOSCTL_BIN) ./hack/tools/boskosctl + $(CONVERSION_VERIFIER): # Build conversion-verifier. GOBIN=$(TOOLS_BIN_DIR) $(GO_TOOLS_BUILD) $(CONVERSION_VERIFIER_PKG) $(CONVERSION_VERIFIER_BIN) $(CONVERSION_VERIFIER_VER) diff --git a/hack/clean-ci.sh b/hack/clean-ci.sh index ad2cc62e6d..39a9b5f18f 100755 --- a/hack/clean-ci.sh +++ b/hack/clean-ci.sh @@ -61,5 +61,8 @@ wait_for_ipam_reachable # Set kubeconfig for IPAM cleanup export KUBECONFIG="${E2E_IPAM_KUBECONFIG}" +# FIXME(sbueringer) before merge this should be moved to the ProwJob config in test-infra (so we can move jobs incrementally to the new environment / boskos) +export BOSKOS_HOST=http://192.168.6.138:32222 + # Run e2e tests make clean-ci diff --git a/hack/e2e.sh b/hack/e2e.sh index 5b7273fb8a..22baa3643a 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -36,7 +36,23 @@ fi # shellcheck source=./hack/ensure-kubectl.sh source "${REPO_ROOT}/hack/ensure-kubectl.sh" +make boskosctl +export BOSKOS_RESOURCE_OWNER=cluster-api-provider-vsphere +if [[ "${JOB_NAME}" != "" ]]; then + export BOSKOS_RESOURCE_OWNER="${JOB_NAME}/${BUILD_ID}" +fi +export BOSKOS_RESOURCE_TYPE=vsphere-project-cluster-api-provider + on_exit() { + # Only handle Boskos when we have to (not for vcsim) + if [[ ! "${GINKGO_FOCUS:-}" =~ $RE_VCSIM ]]; then + # Stop boskos heartbeat + [[ -z ${HEART_BEAT_PID:-} ]] || kill -9 "${HEART_BEAT_PID}" + + # If Boskos is being used then release the vsphere project. + [ -z "${BOSKOS_HOST:-}" ] || boskosctl release --resource-owner="${BOSKOS_RESOURCE_OWNER}" --resource-name="${BOSKOS_RESOURCE_NAME}" --resource-folder="${BOSKOS_RESOURCE_FOLDER}" --resource-pool="${BOSKOS_RESOURCE_POOL}" + fi + # kill the VPN only when we started it (not vcsim) if [[ ! "${GINKGO_FOCUS:-}" =~ $RE_VCSIM ]]; then docker kill vpn @@ -90,11 +106,10 @@ export GC_KIND="false" # Make tests run in-parallel export GINKGO_NODES=5 -# Set the kubeconfig to the IPAM cluster so the e2e tests can claim ip addresses -# for kube-vip. -export E2E_IPAM_KUBECONFIG="/root/ipam-conf/capv-services.conf" +# FIXME(sbueringer) before merge this should be moved to the ProwJob config in test-infra (so we can move jobs incrementally to the new environment / boskos) +export BOSKOS_HOST=http://192.168.6.138:32222 -# Only run the vpn/check for IPAM when we need them (not vcsim) +# Only run the vpn/check for IPAM when we need them (not for vcsim) if [[ ! "${GINKGO_FOCUS:-}" =~ $RE_VCSIM ]]; then # Run the vpn client in container docker run --rm -d --name vpn -v "${HOME}/.openvpn/:${HOME}/.openvpn/" \ @@ -104,11 +119,11 @@ if [[ ! "${GINKGO_FOCUS:-}" =~ $RE_VCSIM ]]; then # Tail the vpn logs docker logs vpn - # Wait until the VPN connection is active and we are able to reach the ipam cluster - function wait_for_ipam_reachable() { + # Wait until the VPN connection is active. + function wait_for_vpn_up() { local n=0 until [ $n -ge 30 ]; do - kubectl --kubeconfig="${E2E_IPAM_KUBECONFIG}" --request-timeout=2s get inclusterippools.ipam.cluster.x-k8s.io && RET=$? || RET=$? + curl "https://${VSPHERE_SERVER}" --connect-timeout 2 -k -v && RET=$? || RET=$? if [[ "$RET" -eq 0 ]]; then break fi @@ -117,7 +132,39 @@ if [[ ! "${GINKGO_FOCUS:-}" =~ $RE_VCSIM ]]; then done return "$RET" } - wait_for_ipam_reachable + wait_for_vpn_up + + # If BOSKOS_HOST is set then acquire a vsphere-project from Boskos. + if [ -n "${BOSKOS_HOST:-}" ]; then + # Check out the account from Boskos and store the produced environment + # variables in a temporary file. + account_env_var_file="$(mktemp)" + boskosctl acquire --resource-owner="${BOSKOS_RESOURCE_OWNER}" --resource-type=${BOSKOS_RESOURCE_TYPE} 1>"${account_env_var_file}" + checkout_account_status="${?}" + + # If the checkout process was a success then load the account's + # environment variables into this process. + # shellcheck disable=SC1090 + [ "${checkout_account_status}" = "0" ] && . "${account_env_var_file}" + export BOSKOS_RESOURCE_NAME=${BOSKOS_RESOURCE_NAME} + export VSPHERE_FOLDER=${BOSKOS_RESOURCE_FOLDER} + export VSPHERE_RESOURCE_POOL=${BOSKOS_RESOURCE_POOL} + export E2E_VSPHERE_IP_POOL="${BOSKOS_RESOURCE_IP_POOL}" + + # Always remove the account environment variable file. It contains + # sensitive information. + rm -f "${account_env_var_file}" + + if [ ! "${checkout_account_status}" = "0" ]; then + echo "error getting vsphere project from Boskos" 1>&2 + exit "${checkout_account_status}" + fi + + # Run the heartbeat to tell boskos periodically that we are still + # using the checked out account. + boskosctl heartbeat --resource-owner="${BOSKOS_RESOURCE_OWNER}" --resource-name="${BOSKOS_RESOURCE_NAME}" >>"${ARTIFACTS}/boskos-heartbeat.log" 2>&1 & + HEART_BEAT_PID=$! + fi fi make envsubst diff --git a/hack/tools/boskosctl/main.go b/hack/tools/boskosctl/main.go new file mode 100644 index 0000000000..317e9a1430 --- /dev/null +++ b/hack/tools/boskosctl/main.go @@ -0,0 +1,283 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package main is the main package for capv-janitor. +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/pkg/errors" + "github.com/spf13/cobra" + kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + + "sigs.k8s.io/cluster-api-provider-vsphere/hack/tools/pkg/boskos" + "sigs.k8s.io/cluster-api-provider-vsphere/hack/tools/pkg/janitor" +) + +var ( + boskosHost string + resourceOwner string + resourceType string + resourceName string + resourceFolder string + resourcePool string +) + +func main() { + ctx := context.Background() + log := klog.Background() + ctrl.LoggerInto(ctx, log) + // Just setting this to avoid that CR is complaining about a missing logger. + ctrl.SetLogger(log) + + rootCmd := setupCommands(ctx) + + if err := rootCmd.Execute(); err != nil { + log.Error(err, "Failed running boskosctl") + os.Exit(1) + } +} + +func setupCommands(ctx context.Context) *cobra.Command { + // Root command + rootCmd := &cobra.Command{ + Use: "boskosctl", + SilenceUsage: true, + Short: "boskosctl can be used to consume Boskos vsphere resources", + } + // Note: http://boskos.test-pods.svc.cluster.local is the URL of the service usually used in k8s.io clusters. + rootCmd.PersistentFlags().StringVar(&boskosHost, "boskos-host", getOrDefault(os.Getenv("BOSKOS_HOST"), "http://boskos.test-pods.svc.cluster.local"), "Boskos server URL.") + rootCmd.PersistentFlags().StringVar(&resourceOwner, "resource-owner", "", "Owner for the resource.") + + // acquire command + acquireCmd := &cobra.Command{ + Use: "acquire", + Args: cobra.NoArgs, + RunE: runCmd(ctx), + } + acquireCmd.PersistentFlags().StringVar(&resourceType, "resource-type", "", "Type of the resource. Should be one of: vsphere-project-cluster-api-provider, vsphere-project-cloud-provider, vsphere-project-image-builder") + rootCmd.AddCommand(acquireCmd) + + // heartbeat command + heartbeatCmd := &cobra.Command{ + Use: "heartbeat", + Args: cobra.NoArgs, + RunE: runCmd(ctx), + } + heartbeatCmd.PersistentFlags().StringVar(&resourceName, "resource-name", "", "Name of the resource.") + rootCmd.AddCommand(heartbeatCmd) + + // release command + releaseCmd := &cobra.Command{ + Use: "release", + Args: cobra.NoArgs, + RunE: runCmd(ctx), + } + releaseCmd.PersistentFlags().StringVar(&resourceName, "resource-name", "", "Name of the resource.") + releaseCmd.PersistentFlags().StringVar(&resourceFolder, "resource-folder", "", "vSphere folder of the resource, required for cleanup before release") + releaseCmd.PersistentFlags().StringVar(&resourcePool, "resource-pool", "", "vSphere resource pool of the resource, required for cleanup before release") + rootCmd.AddCommand(releaseCmd) + + return rootCmd +} + +func getOrDefault(value, defaultValue string) string { + if value != "" { + return value + } + return defaultValue +} + +func runCmd(ctx context.Context) func(cmd *cobra.Command, _ []string) error { + return func(cmd *cobra.Command, _ []string) error { + log := ctrl.LoggerFrom(ctx) + + if boskosHost == "" { + return fmt.Errorf("--boskos-host must be set") + } + if resourceOwner == "" { + return fmt.Errorf("--resource-owner must be set") + } + log = log.WithValues("boskosHost", boskosHost, "resourceOwner", resourceOwner) + ctrl.LoggerInto(ctx, log) + + log.Info("Creating new Boskos client") + client, err := boskos.NewClient(resourceOwner, boskosHost) + if err != nil { + return err + } + + switch cmd.Use { + case "acquire": + if resourceType == "" { + return fmt.Errorf("--resource-type must be set") + } + log := log.WithValues("resourceType", resourceType) + ctrl.LoggerInto(ctx, log) + + return acquire(ctx, client, resourceType) + case "heartbeat": + if resourceName == "" { + return fmt.Errorf("--resource-name must be set") + } + log := log.WithValues("resourceName", resourceName) + ctrl.LoggerInto(ctx, log) + + return heartbeat(ctx, client, resourceName) + case "release": + if resourceName == "" { + return fmt.Errorf("--resource-name must be set") + } + if resourceFolder == "" { + return fmt.Errorf("--resource-folder must be set") + } + if resourcePool == "" { + return fmt.Errorf("--resource-pool must be set") + } + log := log.WithValues("resourceName", resourceName, "resourceFolder", resourceFolder, "resourcePool", resourcePool) + ctrl.LoggerInto(ctx, log) + + return release(ctx, client, resourceName, resourceFolder, resourcePool) + } + + return nil + } +} + +func acquire(ctx context.Context, client *boskos.Client, resourceType string) error { + log := ctrl.LoggerFrom(ctx) + log.Info("Acquiring resource") + res, err := client.Acquire(resourceType, boskos.Free, boskos.Busy) + if err != nil { + return err + } + + if res.UserData == nil { + return errors.Errorf("failed to get user data, resource %q is missing user data", res.Name) + } + + folder, hasFolder := res.UserData.Load("folder") + if !hasFolder { + return errors.Errorf("failed to get user data, resource %q is missing \"folder\" key", res.Name) + } + resourcePool, hasResourcePool := res.UserData.Load("resourcePool") + if !hasResourcePool { + return errors.Errorf("failed to get user data, resource %q is missing \"resourcePool\" key", res.Name) + } + ipPool, hasIPPool := res.UserData.Load("ipPool") + if !hasIPPool { + fmt.Printf(` +export BOSKOS_RESOURCE_NAME=%s +export BOSKOS_RESOURCE_FOLDER=%s +export BOSKOS_RESOURCE_POOL=%s +`, res.Name, folder, resourcePool) + } + + fmt.Printf(` +export BOSKOS_RESOURCE_NAME=%s +export BOSKOS_RESOURCE_FOLDER=%s +export BOSKOS_RESOURCE_POOL=%s +export BOSKOS_RESOURCE_IP_POOL='%s' +`, res.Name, folder, resourcePool, ipPool) + + return nil +} + +func heartbeat(ctx context.Context, client *boskos.Client, resourceName string) error { + log := ctrl.LoggerFrom(ctx) + for { + log.Info("Sending heartbeat") + + if err := client.Update(resourceName, boskos.Busy, nil); err != nil { + log.Error(err, "Sending heartbeat failed") + } else { + log.Error(err, "Sending heartbeat succeeded") + } + + time.Sleep(1 * time.Minute) + } +} + +func release(ctx context.Context, client *boskos.Client, resourceName, resourceFolder, resourcePool string) error { + log := ctrl.LoggerFrom(ctx) + + var username, password, server, thumbprint string + if username = os.Getenv("VSPHERE_USERNAME"); username == "" { + return fmt.Errorf("env var VSPHERE_USERNAME must be set") + } + if password = os.Getenv("VSPHERE_PASSWORD"); password == "" { + return fmt.Errorf("env var VSPHERE_PASSWORD must be set") + } + if server = os.Getenv("VSPHERE_SERVER"); server == "" { + return fmt.Errorf("env var VSPHERE_SERVER must be set") + } + if thumbprint = os.Getenv("VSPHERE_TLS_THUMBPRINT"); thumbprint == "" { + return fmt.Errorf("env var VSPHERE_TLS_THUMBPRINT must be set") + } + log = log.WithValues("vSphereServer", server) + ctrl.LoggerInto(ctx, log) + + log.Info("Releasing resource") + + // Create clients for vSphere. + vSphereClients, err := janitor.NewVSphereClients(ctx, janitor.NewVSphereClientsInput{ + Username: username, + Password: password, + Server: server, + Thumbprint: thumbprint, + UserAgent: "boskosctl", + }) + if err != nil { + return errors.Wrap(err, "failed to create vSphere clients") + } + defer vSphereClients.Logout(ctx) + + // Delete all VMs created up until now. + maxCreationDate := time.Now() + j := janitor.NewJanitor(vSphereClients, nil, maxCreationDate, "", false) + + log.Info("Cleaning up vSphere") + // Note: We intentionally want to skip clusterModule cleanup. If we run this too often we might hit race conditions + // when other tests are creating cluster modules in parallel. + if err := j.CleanupVSphere(ctx, []string{resourceFolder}, []string{resourcePool}, []string{resourceFolder}, true); err != nil { + log.Info("Cleaning up vSphere failed") + + // Try to release resource as dirty. + log.Info("Releasing resource as dirty") + if releaseErr := client.Release(resourceName, boskos.Dirty); releaseErr != nil { + return errors.Wrapf(kerrors.NewAggregate([]error{err, releaseErr}), "cleaning up vSphere and releasing resource as dirty failed, resource will now become stale") + } + log.Info("Releasing resource as dirty succeeded") + + return errors.Wrapf(err, "cleaning up vSphere failed, resource was released as dirty") + } + log.Info("Cleaning up vSphere succeeded") + + // Try to release resource as free. + log.Info("Releasing resource as free") + if releaseErr := client.Release(resourceName, boskos.Free); releaseErr != nil { + return errors.Wrapf(releaseErr, "cleaning up vSphere succeeded and releasing resource as free failed, resource will now become stale") + } + log.Info("Releasing resource as free succeeded") + + return nil +} diff --git a/hack/tools/janitor/main.go b/hack/tools/janitor/main.go index 01740b07e2..f4e8d640dc 100644 --- a/hack/tools/janitor/main.go +++ b/hack/tools/janitor/main.go @@ -26,10 +26,14 @@ import ( "github.com/pkg/errors" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/runtime" + kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1beta1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/cluster-api-provider-vsphere/hack/tools/pkg/boskos" + "sigs.k8s.io/cluster-api-provider-vsphere/hack/tools/pkg/janitor" ) var ipamScheme *runtime.Scheme @@ -40,15 +44,24 @@ func init() { } var ( - dryRun bool - ipamNamespace string - maxAge time.Duration + dryRun bool + ipamNamespace string + maxAge time.Duration + // Flags to get folders and resource pools from Boskos. + boskosHost string + resourceOwner string + resourceTypes []string + // Flags to directly specify folders and resource pools. vsphereVMFolders []string vsphereFolders []string vsphereResourcePools []string ) func initFlags(fs *pflag.FlagSet) { + // Note: Intentionally not adding a fallback value, so it is still possible to not use Boskos. + fs.StringVar(&boskosHost, "boskos-host", os.Getenv("BOSKOS_HOST"), "Boskos server URL. Boskos is only used to retrieve resources if this flag is set.") + fs.StringVar(&resourceOwner, "resource-owner", "vsphere-janitor", "Owner for the resource.") + fs.StringArrayVar(&resourceTypes, "resource-type", []string{"vsphere-project-cluster-api-provider", "vsphere-project-cloud-provider", "vsphere-project-image-builder"}, "Types of the resources") fs.StringArrayVar(&vsphereVMFolders, "vm-folder", []string{}, "Path to folders in vCenter to cleanup virtual machines.") fs.StringArrayVar(&vsphereFolders, "folder", []string{}, "Path to a folder in vCenter to recursively cleanup empty subfolders.") fs.StringArrayVar(&vsphereResourcePools, "resource-pool", []string{}, "Path to a resource pool in vCenter to recursively cleanup empty child resource pools.") @@ -63,6 +76,8 @@ func main() { pflag.Parse() log := klog.Background() + // Just setting this to avoid that CR is complaining about a missing logger. + ctrl.SetLogger(log) ctx := ctrl.LoggerInto(context.Background(), log) if err := run(ctx); err != nil { @@ -76,15 +91,8 @@ func main() { func run(ctx context.Context) error { log := ctrl.LoggerFrom(ctx) - log.Info("Configured settings", "dry-run", dryRun) - log.Info("Configured settings", "folders", vsphereFolders) - log.Info("Configured settings", "vm-folders", vsphereVMFolders) - log.Info("Configured settings", "resource-pools", vsphereResourcePools) - log.Info("Configured settings", "ipam-namespace", ipamNamespace) - log.Info("Configured settings", "max-age", maxAge) - // Create clients for vSphere. - vSphereClients, err := newVSphereClients(ctx, getVSphereClientInput{ + vSphereClients, err := janitor.NewVSphereClients(ctx, janitor.NewVSphereClientsInput{ Username: os.Getenv("GOVC_USERNAME"), Password: os.Getenv("GOVC_PASSWORD"), Server: os.Getenv("GOVC_URL"), @@ -94,7 +102,7 @@ func run(ctx context.Context) error { if err != nil { return errors.Wrap(err, "creating vSphere clients") } - defer vSphereClients.logout(ctx) + defer vSphereClients.Logout(ctx) // Create controller-runtime client for IPAM. restConfig, err := ctrl.GetConfig() @@ -106,17 +114,100 @@ func run(ctx context.Context) error { return errors.Wrap(err, "creating IPAM client") } - janitor := newJanitor(vSphereClients, ipamClient, maxAge, ipamNamespace, dryRun) + if boskosHost != "" { + log = log.WithValues("boskosHost", boskosHost, "resourceOwner", resourceOwner) + log.Info("Getting resources to cleanup from Boskos") + client, err := boskos.NewClient(resourceOwner, boskosHost) + if err != nil { + return err + } + + var allErrs []error + for _, resourceType := range resourceTypes { + // For all resource in state dirty that are currently not owned: + // * acquire the resource (and set it to state "cleaning") + // * try to clean up vSphere + // * if cleanup succeeds, release the resource as free + // * if cleanup fails, resource will stay in cleaning and become stale (reaper will move it to dirty) + for { + log.Info("Acquiring resource") + res, err := client.Acquire(resourceType, boskos.Dirty, boskos.Cleaning) + if err != nil { + // If we get an error on acquire we're done looping through all dirty resources + if errors.Is(err, boskos.ErrNotFound) { + // Note: ErrNotFound means there are no more dirty resources that are not owned. + log.Info("No more resources to cleanup") + break + } + allErrs = append(allErrs, errors.Wrapf(err, "failed to acquire resource")) + break + } + log := log.WithValues("resourceName", res.Name) + + if res.UserData == nil { + allErrs = append(allErrs, errors.Errorf("failed to get user data, resource %q is missing user data", res.Name)) + continue + } + + folder, hasFolder := res.UserData.Load("folder") + if !hasFolder { + allErrs = append(allErrs, errors.Errorf("failed to get user data, resource %q is missing \"folder\" key", res.Name)) + continue + } + resourcePool, hasResourcePool := res.UserData.Load("resourcePool") + if !hasResourcePool { + allErrs = append(allErrs, errors.Errorf("failed to get user data, resource %q is missing \"resourcePool\" key", res.Name)) + continue + } + + // Delete all VMs created up until now. + maxCreationDate := time.Now() + j := janitor.NewJanitor(vSphereClients, nil, maxCreationDate, "", false) + + log.Info("Cleaning up vSphere") + if err := j.CleanupVSphere(ctx, []string{folder.(string)}, []string{resourcePool.(string)}, []string{folder.(string)}, false); err != nil { + log.Info("Cleaning up vSphere failed") + + // Intentionally keep this resource in cleaning state. The reaper will move it from cleaning to dirty + // and we'll retry the cleanup. + // If we move it to dirty here, the for loop will pick it up again, and we get stuck in an infinite loop. + allErrs = append(allErrs, errors.Wrapf(err, "cleaning up vSphere failed, resource %q will now become stale", res.Name)) + continue + } + log.Info("Cleaning up vSphere succeeded") + + // Try to release resource as free. + log.Info("Releasing resource as free") + if releaseErr := client.Release(res.Name, boskos.Free); releaseErr != nil { + allErrs = append(allErrs, errors.Wrapf(releaseErr, "cleaning up vSphere succeeded and releasing resource as free failed, resource %q will now become stale", res.Name)) + } + log.Info("Releasing resource as free succeeded") + } + } + if len(allErrs) > 0 { + return errors.Wrap(kerrors.NewAggregate(allErrs), "cleaning up Boskos resources") + } + } + + // Note: The following will be deleted once we migrated all repos to Boskos. + maxCreationDate := time.Now().Add(-maxAge) + janitor := janitor.NewJanitor(vSphereClients, ipamClient, maxCreationDate, ipamNamespace, dryRun) - log.Info("Configured settings", "janitor.maxCreationDate", janitor.maxCreationDate) + log.Info("Configured settings", "dry-run", dryRun) + log.Info("Configured settings", "folders", vsphereFolders) + log.Info("Configured settings", "vm-folders", vsphereVMFolders) + log.Info("Configured settings", "resource-pools", vsphereResourcePools) + log.Info("Configured settings", "ipam-namespace", ipamNamespace) + log.Info("Configured settings", "max-age", maxAge) + log.Info("Configured settings", "janitor.maxCreationDate", maxCreationDate) // First cleanup old vms and other vSphere resources to free up IPAddressClaims or cluster modules which are still in-use. - if err := janitor.cleanupVSphere(ctx, vsphereFolders, vsphereResourcePools, vsphereVMFolders); err != nil { + if err := janitor.CleanupVSphere(ctx, vsphereFolders, vsphereResourcePools, vsphereVMFolders, false); err != nil { return errors.Wrap(err, "cleaning up vSphere") } // Second cleanup IPAddressClaims. - if err := janitor.deleteIPAddressClaims(ctx); err != nil { + if err := janitor.DeleteIPAddressClaims(ctx); err != nil { return errors.Wrap(err, "cleaning up IPAddressClaims") } diff --git a/hack/tools/pkg/boskos/client.go b/hack/tools/pkg/boskos/client.go new file mode 100644 index 0000000000..8c5882dc8a --- /dev/null +++ b/hack/tools/pkg/boskos/client.go @@ -0,0 +1,711 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package boskos implements a boskos client. +package boskos + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "os" + "strings" + "sync" + "syscall" + "time" + + "github.com/google/uuid" + kerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// This implementation is based on https://github.com/kubernetes-sigs/boskos/blob/59dbd6c27f19fbd469b62b22177f22dc0a5d52dd/client/client.go +// We didn't want to import it directly to avoid dependencies on old controller-runtime / client-go versions. + +var ( + // ErrNotFound is returned by Acquire() when no resources are available. + ErrNotFound = errors.New("resources not found") + // ErrAlreadyInUse is returned by Acquire when resources are already being requested. + ErrAlreadyInUse = errors.New("resources already used by another user") + // ErrContextRequired is returned by AcquireWait and AcquireByStateWait when + // they are invoked with a nil context. + ErrContextRequired = errors.New("context required") + // ErrTypeNotFound is returned when the requested resource type (rtype) does not exist. + // For this error to be returned, you must set DistinguishNotFoundVsTypeNotFound to true. + ErrTypeNotFound = errors.New("resource type not found") +) + +// Client defines the public Boskos client object. +type Client struct { + // Dialer is the net.Dialer used to establish connections to the remote + // boskos endpoint. + Dialer DialerWithRetry + // DistinguishNotFoundVsTypeNotFound, if set, will make it possible to distinguish between + // ErrNotFound and ErrTypeNotFound. For backwards-compatibility, this flag is off by + // default. + DistinguishNotFoundVsTypeNotFound bool + + // http is the http.Client used to interact with the boskos REST API + http http.Client + + owner string + url string + lock sync.Mutex + + storage PersistenceLayer +} + +// NewClient creates a Boskos client for the specified URL and resource owner. +// +// Clients created with this function default to retrying failed connection +// attempts three times with a ten second pause between each attempt. +// Note: username & passwordFile was dropped to avoid a dependency on "k8s.io/test-infra/prow/config/secret". +func NewClient(owner, urlString string) (*Client, error) { + client := &Client{ + url: urlString, + owner: owner, + storage: NewMemoryStorage(), + DistinguishNotFoundVsTypeNotFound: true, + } + + // Configure the dialer to attempt three additional times to establish + // a connection after a failed dial attempt. The dialer should wait 10 + // seconds between each attempt. + client.Dialer.RetryCount = 3 + client.Dialer.RetrySleep = time.Second * 10 + + // Configure the dialer and HTTP client transport to mimic the configuration + // of the http.DefaultTransport with the exception that the Dialer's Dial + // and DialContext functions are assigned to the client transport. + // + // See https://golang.org/pkg/net/http/#RoundTripper for the + // values used for the http.DefaultTransport. + client.Dialer.Timeout = 30 * time.Second + client.Dialer.KeepAlive = 30 * time.Second + client.Dialer.DualStack = true + client.http.Transport = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: client.Dialer.Dial, + DialContext: client.Dialer.DialContext, + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + return client, nil +} + +// public method. + +// Acquire asks boskos for a resource of certain type in certain state, and set the resource to dest state. +// Returns the resource on success. +func (c *Client) Acquire(rtype, state, dest string) (*Resource, error) { + return c.AcquireWithPriority(rtype, state, dest, "") +} + +// AcquireWithPriority asks boskos for a resource of certain type in certain state, and set the resource to dest state. +// Returns the resource on success. +// Boskos Priority are FIFO. +func (c *Client) AcquireWithPriority(rtype, state, dest, requestID string) (*Resource, error) { + r, err := c.acquire(rtype, state, dest, requestID) + if err != nil { + return nil, err + } + c.lock.Lock() + defer c.lock.Unlock() + if r != nil { + if err := c.storage.Add(*r); err != nil { + return nil, err + } + } + + return r, nil +} + +// AcquireWait blocks until Acquire returns the specified resource or the +// provided context is cancelled or its deadline exceeded. +func (c *Client) AcquireWait(ctx context.Context, rtype, state, dest string) (*Resource, error) { + // request with FIFO priority + requestID := uuid.New().String() + return c.AcquireWaitWithPriority(ctx, rtype, state, dest, requestID) +} + +// AcquireWaitWithPriority blocks until Acquire returns the specified resource or the +// provided context is cancelled or its deadline exceeded. This allows you to pass in a request priority. +// Boskos Priority are FIFO. +func (c *Client) AcquireWaitWithPriority(ctx context.Context, rtype, state, dest, requestID string) (*Resource, error) { + if ctx == nil { + return nil, ErrContextRequired + } + // Try to acquire the resource until available or the context is + // cancelled or its deadline exceeded. + for { + r, err := c.AcquireWithPriority(rtype, state, dest, requestID) + if err != nil { + if err == ErrAlreadyInUse || err == ErrNotFound { + select { + case <-ctx.Done(): + return nil, err + case <-time.After(3 * time.Second): + continue + } + } + return nil, err + } + return r, nil + } +} + +// AcquireByState asks boskos for a resources of certain type, and set the resource to dest state. +// Returns a list of resources on success. +func (c *Client) AcquireByState(state, dest string, names []string) ([]Resource, error) { + resources, err := c.acquireByState(state, dest, names) + if err != nil { + return nil, err + } + c.lock.Lock() + defer c.lock.Unlock() + for _, r := range resources { + if err := c.storage.Add(r); err != nil { + return nil, err + } + } + return resources, nil +} + +// AcquireByStateWait blocks until AcquireByState returns the specified +// resource(s) or the provided context is cancelled or its deadline +// exceeded. +func (c *Client) AcquireByStateWait(ctx context.Context, state, dest string, names []string) ([]Resource, error) { + if ctx == nil { + return nil, ErrContextRequired + } + // Try to acquire the resource(s) until available or the context is + // cancelled or its deadline exceeded. + for { + r, err := c.AcquireByState(state, dest, names) + if err != nil { + if err == ErrAlreadyInUse || err == ErrNotFound { + select { + case <-ctx.Done(): + return nil, err + case <-time.After(3 * time.Second): + continue + } + } + return nil, err + } + return r, nil + } +} + +// ReleaseAll returns all resources hold by the client back to boskos and set them to dest state. +func (c *Client) ReleaseAll(dest string) error { + c.lock.Lock() + defer c.lock.Unlock() + resources, err := c.storage.List() + if err != nil { + return err + } + if len(resources) == 0 { + return fmt.Errorf("no holding resource") + } + var allErrors []error + for _, r := range resources { + if err := c.storage.Delete(r.Name); err != nil { + allErrors = append(allErrors, err) + } + + err := c.Release(r.Name, dest) + if err != nil { + allErrors = append(allErrors, err) + } + } + return kerrors.NewAggregate(allErrors) +} + +// ReleaseOne returns one of owned resources back to boskos and set it to dest state. +func (c *Client) ReleaseOne(name, dest string) error { + c.lock.Lock() + defer c.lock.Unlock() + + if _, err := c.storage.Get(name); err != nil { + return fmt.Errorf("no resource name %v", name) + } + + if err := c.storage.Delete(name); err != nil { + return err + } + + return c.Release(name, dest) +} + +// UpdateAll signals update for all resources hold by the client. +func (c *Client) UpdateAll(state string) error { + c.lock.Lock() + defer c.lock.Unlock() + + resources, err := c.storage.List() + if err != nil { + return err + } + if len(resources) == 0 { + return fmt.Errorf("no holding resource") + } + var allErrors []error + for _, r := range resources { + if err := c.Update(r.Name, state, nil); err != nil { + allErrors = append(allErrors, err) + continue + } + if err := c.updateLocalResource(r, state, nil); err != nil { + allErrors = append(allErrors, err) + } + } + return kerrors.NewAggregate(allErrors) +} + +// SyncAll signals update for all resources hold by the client. +func (c *Client) SyncAll() error { + c.lock.Lock() + defer c.lock.Unlock() + + resources, err := c.storage.List() + if err != nil { + return err + } + if len(resources) == 0 { + fmt.Println("no resource to sync") + return nil + } + var allErrors []error + for _, r := range resources { + if err := c.Update(r.Name, r.State, nil); err != nil { + allErrors = append(allErrors, err) + continue + } + if _, err := c.storage.Update(r); err != nil { + allErrors = append(allErrors, err) + } + } + return kerrors.NewAggregate(allErrors) +} + +// UpdateOne signals update for one of the resources hold by the client. +func (c *Client) UpdateOne(name, state string, userData *UserData) error { + c.lock.Lock() + defer c.lock.Unlock() + + r, err := c.storage.Get(name) + if err != nil { + return fmt.Errorf("no resource name %v", name) + } + if err := c.Update(r.Name, state, userData); err != nil { + return err + } + return c.updateLocalResource(r, state, userData) +} + +// Reset will scan all boskos resources of type, in state, last updated before expire, and set them to dest state. +// Returns a map of {resourceName:owner} for further actions. +func (c *Client) Reset(rtype, state string, expire time.Duration, dest string) (map[string]string, error) { + return c.reset(rtype, state, expire, dest) +} + +// Metric will query current metric for target resource type. +// Return a common.Metric object on success. +func (c *Client) Metric(rtype string) (Metric, error) { + return c.metric(rtype) +} + +// HasResource tells if current client holds any resources. +func (c *Client) HasResource() bool { + resources, _ := c.storage.List() + return len(resources) > 0 +} + +// private methods. + +func (c *Client) updateLocalResource(res Resource, state string, data *UserData) error { + res.State = state + if res.UserData == nil { + res.UserData = data + } else { + res.UserData.Update(data) + } + _, err := c.storage.Update(res) + return err +} + +func (c *Client) acquire(rtype, state, dest, requestID string) (*Resource, error) { + values := url.Values{} + values.Set("type", rtype) + values.Set("state", state) + values.Set("owner", c.owner) + values.Set("dest", dest) + if requestID != "" { + values.Set("request_id", requestID) + } + + res := Resource{} + + work := func(retriedErrs *[]error) (bool, error) { + resp, err := c.httpPost("/acquire", values, "", nil) + if err != nil { + // Swallow the error so we can retry + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + body, err := io.ReadAll(resp.Body) + if err != nil { + return false, err + } + + err = json.Unmarshal(body, &res) + if err != nil { + return false, err + } + if res.Name == "" { + return false, fmt.Errorf("unable to parse resource") + } + return true, nil + case http.StatusUnauthorized: + return false, ErrAlreadyInUse + case http.StatusNotFound: + // The only way to distinguish between all reasources being busy and a request for a non-existent + // resource type is to check the text of the accompanying error message. + if c.DistinguishNotFoundVsTypeNotFound { + if bytes, err := io.ReadAll(resp.Body); err == nil { + errorMsg := string(bytes) + if strings.Contains(errorMsg, ResourceTypeNotFoundMessage(rtype)) { + return false, ErrTypeNotFound + } + } + } + return false, ErrNotFound + default: + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, status code %v", resp.Status, resp.StatusCode)) + // Swallow it so we can retry + return false, nil + } + } + + return &res, retry(work) +} + +func (c *Client) acquireByState(state, dest string, names []string) ([]Resource, error) { + values := url.Values{} + values.Set("state", state) + values.Set("dest", dest) + values.Set("names", strings.Join(names, ",")) + values.Set("owner", c.owner) + var resources []Resource + + work := func(retriedErrs *[]error) (bool, error) { + resp, err := c.httpPost("/acquirebystate", values, "", nil) + if err != nil { + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + if err := json.NewDecoder(resp.Body).Decode(&resources); err != nil { + return false, err + } + return true, nil + case http.StatusUnauthorized: + return false, ErrAlreadyInUse + case http.StatusNotFound: + return false, ErrNotFound + default: + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, status code %v", resp.Status, resp.StatusCode)) + return false, nil + } + } + + return resources, retry(work) +} + +// Release a lease for a resource and set its state to the destination state. +func (c *Client) Release(name, dest string) error { + values := url.Values{} + values.Set("name", name) + values.Set("dest", dest) + values.Set("owner", c.owner) + + work := func(retriedErrs *[]error) (bool, error) { + resp, err := c.httpPost("/release", values, "", nil) + if err != nil { + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, statusCode %v releasing %s", resp.Status, resp.StatusCode, name)) + return false, nil + } + return true, nil + } + + return retry(work) +} + +// Update a resource on the server, setting the state and user data. +func (c *Client) Update(name, state string, userData *UserData) error { + var bodyData *bytes.Buffer + if userData != nil { + bodyData = new(bytes.Buffer) + err := json.NewEncoder(bodyData).Encode(userData) + if err != nil { + return err + } + } + values := url.Values{} + values.Set("name", name) + values.Set("owner", c.owner) + values.Set("state", state) + + work := func(retriedErrs *[]error) (bool, error) { + // As the body is an io.Reader and hence its content + // can only be read once, we have to copy it for every request we make + var body io.Reader + if bodyData != nil { + body = bytes.NewReader(bodyData.Bytes()) + } + resp, err := c.httpPost("/update", values, "application/json", body) + if err != nil { + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, status code %v updating %s", resp.Status, resp.StatusCode, name)) + return false, nil + } + return true, nil + } + + return retry(work) +} + +func (c *Client) reset(rtype, state string, expire time.Duration, dest string) (map[string]string, error) { + rmap := make(map[string]string) + values := url.Values{} + values.Set("type", rtype) + values.Set("state", state) + values.Set("expire", expire.String()) + values.Set("dest", dest) + + work := func(retriedErrs *[]error) (bool, error) { + resp, err := c.httpPost("/reset", values, "", nil) + if err != nil { + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, err := io.ReadAll(resp.Body) + if err != nil { + return false, err + } + + err = json.Unmarshal(body, &rmap) + return true, err + } + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, status code %v", resp.Status, resp.StatusCode)) + return false, nil + } + + return rmap, retry(work) +} + +func (c *Client) metric(rtype string) (Metric, error) { + var metric Metric + values := url.Values{} + values.Set("type", rtype) + + work := func(retriedErrs *[]error) (bool, error) { + resp, err := c.httpGet("/metric", values) + if err != nil { + *retriedErrs = append(*retriedErrs, err) + return false, nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + *retriedErrs = append(*retriedErrs, fmt.Errorf("status %s, status code %v", resp.Status, resp.StatusCode)) + return false, nil + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return false, err + } + + return true, json.Unmarshal(body, &metric) + } + + return metric, retry(work) +} + +func (c *Client) httpGet(action string, values url.Values) (*http.Response, error) { + u, err := url.ParseRequestURI(c.url) + if err != nil { + return nil, err + } + u.Path = action + u.RawQuery = values.Encode() + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, u.String(), http.NoBody) + if err != nil { + return nil, err + } + return c.http.Do(req) +} + +func (c *Client) httpPost(action string, values url.Values, contentType string, body io.Reader) (*http.Response, error) { + u, err := url.ParseRequestURI(c.url) + if err != nil { + return nil, err + } + u.Path = action + u.RawQuery = values.Encode() + req, err := http.NewRequestWithContext(context.TODO(), http.MethodPost, u.String(), body) + if err != nil { + return nil, err + } + if contentType != "" { + req.Header.Set("Content-Type", contentType) + } + return c.http.Do(req) +} + +// DialerWithRetry is a composite version of the net.Dialer that retries +// connection attempts. +type DialerWithRetry struct { + net.Dialer + + // RetryCount is the number of times to retry a connection attempt. + RetryCount uint + + // RetrySleep is the length of time to pause between retry attempts. + RetrySleep time.Duration +} + +// Dial connects to the address on the named network. +func (d *DialerWithRetry) Dial(network, address string) (net.Conn, error) { + return d.DialContext(context.Background(), network, address) +} + +// DialContext connects to the address on the named network using the provided context. +func (d *DialerWithRetry) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + // Always bump the retry count by 1 in order to equal the actual number of + // attempts. For example, if a retry count of 2 is specified, the intent + // is for three attempts -- the initial attempt with two retries in case + // the initial attempt times out. + count := d.RetryCount + 1 + sleep := d.RetrySleep + i := uint(0) + for { + conn, err := d.Dialer.DialContext(ctx, network, address) + if err != nil { + if isDialErrorRetriable(err) { + if i < count-1 { + select { + case <-time.After(sleep): + i++ + continue + case <-ctx.Done(): + return nil, err + } + } + } + return nil, err + } + return conn, nil + } +} + +// isDialErrorRetriable determines whether or not a dialer should retry +// a failed connection attempt by examining the connection error to see +// if it is one of the following error types: +// - Timeout +// - Temporary +// - ECONNREFUSED +// - ECONNRESET +func isDialErrorRetriable(err error) bool { + opErr, isOpErr := err.(*net.OpError) + if !isOpErr { + return false + } + if opErr.Timeout() || opErr.Temporary() { + return true + } + sysErr, isSysErr := opErr.Err.(*os.SyscallError) + if !isSysErr { + return false + } + switch sysErr.Err { + case syscall.ECONNREFUSED, syscall.ECONNRESET: + return true + } + return false +} + +// workFunc describes retrieable work. It should +// * Return an error for non-recoverable errors +// * Write retriable errors into `retriedErrs` and return with false, nil +// * Return with true, nil on success. +type workFunc func(retriedErrs *[]error) (bool, error) + +// SleepFunc is called when requests are retried. This may be replaced in tests. +var SleepFunc = time.Sleep + +func retry(work workFunc) error { + var retriedErrs []error + + maxAttempts := 4 + for i := 1; i <= maxAttempts; i++ { + success, err := work(&retriedErrs) + if err != nil { + return err + } + if success { + return nil + } + if i == maxAttempts { + break + } + + SleepFunc(time.Duration(i*i) * time.Second) + } + + return kerrors.NewAggregate(retriedErrs) +} diff --git a/hack/tools/pkg/boskos/common.go b/hack/tools/pkg/boskos/common.go new file mode 100644 index 0000000000..59b1c3c7f4 --- /dev/null +++ b/hack/tools/pkg/boskos/common.go @@ -0,0 +1,152 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package boskos + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "sigs.k8s.io/yaml" +) + +// This implementation is based on https://github.com/kubernetes-sigs/boskos/blob/59dbd6c27f19fbd469b62b22177f22dc0a5d52dd/common/common.go#L34 +// We didn't want to import it directly to avoid dependencies on old controller-runtime / client-go versions. + +const ( + // Busy state defines a resource being used. + Busy = "busy" + // Cleaning state defines a resource being cleaned. + Cleaning = "cleaning" + // Dirty state defines a resource that needs cleaning. + Dirty = "dirty" + // Free state defines a resource that is usable. + Free = "free" + // Leased state defines a resource being leased in order to make a new resource. + Leased = "leased" + // ToBeDeleted is used for resources about to be deleted, they will be verified by a cleaner which mark them as tombstone. + ToBeDeleted = "toBeDeleted" + // Tombstone is the state in which a resource can safely be deleted. + Tombstone = "tombstone" + // Other is used to agglomerate unspecified states for metrics reporting. + Other = "other" +) + +// UserData is a map of Name to user defined interface, serialized into a string. +type UserData struct { + sync.Map +} + +// UserDataMap is the standard Map version of UserMap, it is used to ease UserMap creation. +type UserDataMap map[string]string + +// Resource abstracts any resource type that can be tracked by boskos. +type Resource struct { + Type string `json:"type"` + Name string `json:"name"` + State string `json:"state"` + Owner string `json:"owner"` + LastUpdate time.Time `json:"lastupdate"` + // Customized UserData + UserData *UserData `json:"userdata"` + // Used to clean up dynamic resources + ExpirationDate *time.Time `json:"expiration-date,omitempty"` +} + +// Metric contains analytics about a specific resource type. +type Metric struct { + Type string `json:"type"` + Current map[string]int `json:"current"` + Owners map[string]int `json:"owner"` + // TODO: implements state transition metrics +} + +// UserDataNotFound will be returned if requested resource does not exist. +type UserDataNotFound struct { + ID string +} + +func (ud *UserDataNotFound) Error() string { + return fmt.Sprintf("user data ID %s does not exist", ud.ID) +} + +// UnmarshalJSON implements JSON Unmarshaler interface. +func (ud *UserData) UnmarshalJSON(data []byte) error { + tmpMap := UserDataMap{} + if err := json.Unmarshal(data, &tmpMap); err != nil { + return err + } + ud.FromMap(tmpMap) + return nil +} + +// MarshalJSON implements JSON Marshaler interface. +func (ud *UserData) MarshalJSON() ([]byte, error) { + return json.Marshal(ud.ToMap()) +} + +// Extract unmarshalls a string a given struct if it exists. +func (ud *UserData) Extract(id string, out interface{}) error { + content, ok := ud.Load(id) + if !ok { + return &UserDataNotFound{id} + } + return yaml.Unmarshal([]byte(content.(string)), out) +} + +// Update updates existing UserData with new UserData. +// If a key as an empty string, the key will be deleted. +func (ud *UserData) Update(newUserData *UserData) *UserData { + if newUserData == nil { + return ud + } + newUserData.Range(func(key, value interface{}) bool { + if value.(string) != "" { + ud.Store(key, value) + } else { + ud.Delete(key) + } + return true + }) + return ud +} + +// ToMap converts a UserData to UserDataMap. +func (ud *UserData) ToMap() UserDataMap { + if ud == nil { + return nil + } + m := UserDataMap{} + ud.Range(func(key, value interface{}) bool { + m[key.(string)] = value.(string) + return true + }) + return m +} + +// FromMap feels updates user data from a map. +func (ud *UserData) FromMap(m UserDataMap) { + for key, value := range m { + ud.Store(key, value) + } +} + +// ResourceTypeNotFoundMessage returns a resource type not found message. +func ResourceTypeNotFoundMessage(rType string) string { + return fmt.Sprintf("resource type %q does not exist", rType) +} diff --git a/hack/tools/pkg/boskos/storage.go b/hack/tools/pkg/boskos/storage.go new file mode 100644 index 0000000000..fa2efe471c --- /dev/null +++ b/hack/tools/pkg/boskos/storage.go @@ -0,0 +1,99 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package boskos + +import ( + "fmt" + "sync" +) + +// This implementation is based on https://github.com/kubernetes-sigs/boskos/blob/59dbd6c27f19fbd469b62b22177f22dc0a5d52dd/storage/storage.go +// We didn't want to import it directly to avoid dependencies on old controller-runtime / client-go versions. + +// PersistenceLayer defines a simple interface to persists Boskos Information. +type PersistenceLayer interface { + Add(r Resource) error + Delete(name string) error + Update(r Resource) (Resource, error) + Get(name string) (Resource, error) + List() ([]Resource, error) +} + +type inMemoryStore struct { + resources map[string]Resource + lock sync.RWMutex +} + +// NewMemoryStorage creates an in memory persistence layer. +func NewMemoryStorage() PersistenceLayer { + return &inMemoryStore{ + resources: map[string]Resource{}, + } +} + +func (im *inMemoryStore) Add(r Resource) error { + im.lock.Lock() + defer im.lock.Unlock() + _, ok := im.resources[r.Name] + if ok { + return fmt.Errorf("resource %s already exists", r.Name) + } + im.resources[r.Name] = r + return nil +} + +func (im *inMemoryStore) Delete(name string) error { + im.lock.Lock() + defer im.lock.Unlock() + _, ok := im.resources[name] + if !ok { + return fmt.Errorf("cannot find item %s", name) + } + delete(im.resources, name) + return nil +} + +func (im *inMemoryStore) Update(r Resource) (Resource, error) { + im.lock.Lock() + defer im.lock.Unlock() + _, ok := im.resources[r.Name] + if !ok { + return Resource{}, fmt.Errorf("cannot find item %s", r.Name) + } + im.resources[r.Name] = r + return r, nil +} + +func (im *inMemoryStore) Get(name string) (Resource, error) { + im.lock.RLock() + defer im.lock.RUnlock() + r, ok := im.resources[name] + if !ok { + return Resource{}, fmt.Errorf("cannot find item %s", name) + } + return r, nil +} + +func (im *inMemoryStore) List() ([]Resource, error) { + im.lock.RLock() + defer im.lock.RUnlock() + resources := []Resource{} + for _, r := range im.resources { + resources = append(resources, r) + } + return resources, nil +} diff --git a/hack/tools/janitor/janitor.go b/hack/tools/pkg/janitor/janitor.go similarity index 93% rename from hack/tools/janitor/janitor.go rename to hack/tools/pkg/janitor/janitor.go index 3427590c2a..e089e93be7 100644 --- a/hack/tools/janitor/janitor.go +++ b/hack/tools/pkg/janitor/janitor.go @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +// Package janitor implements a janitor for vSphere. +package janitor import ( "context" @@ -36,22 +37,24 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func newJanitor(vSphereClients *vSphereClients, ipamClient client.Client, maxAge time.Duration, ipamNamespace string, dryRun bool) *janitor { - return &janitor{ +// NewJanitor creates a new Janitor. +func NewJanitor(vSphereClients *VSphereClients, ipamClient client.Client, maxCreationDate time.Time, ipamNamespace string, dryRun bool) *Janitor { + return &Janitor{ dryRun: dryRun, ipamClient: ipamClient, ipamNamespace: ipamNamespace, - maxCreationDate: time.Now().Add(-maxAge), + maxCreationDate: maxCreationDate, vSphereClients: vSphereClients, } } -type janitor struct { +// Janitor implements a janitor for vSphere. +type Janitor struct { dryRun bool ipamClient client.Client ipamNamespace string maxCreationDate time.Time - vSphereClients *vSphereClients + vSphereClients *VSphereClients } type virtualMachine struct { @@ -59,7 +62,8 @@ type virtualMachine struct { object *object.VirtualMachine } -func (s *janitor) cleanupVSphere(ctx context.Context, folders, resourcePools, vmFolders []string) error { +// CleanupVSphere cleans up vSphere VMs, folders and resource pools. +func (s *Janitor) CleanupVSphere(ctx context.Context, folders, resourcePools, vmFolders []string, skipClusterModule bool) error { errList := []error{} // Delete vms to cleanup folders and resource pools. @@ -92,6 +96,10 @@ func (s *janitor) cleanupVSphere(ctx context.Context, folders, resourcePools, vm return errors.Wrap(err, "cleaning up folders") } + if skipClusterModule { + return nil + } + // Delete empty cluster modules. if err := s.deleteVSphereClusterModules(ctx); err != nil { return errors.Wrap(err, "cleaning up vSphere cluster modules") @@ -102,7 +110,7 @@ func (s *janitor) cleanupVSphere(ctx context.Context, folders, resourcePools, vm // deleteVSphereVMs deletes all VSphereVMs in a given folder in vSphere if their creation // timestamp is before the janitor's configured maxCreationDate. -func (s *janitor) deleteVSphereVMs(ctx context.Context, folder string) error { +func (s *Janitor) deleteVSphereVMs(ctx context.Context, folder string) error { log := ctrl.LoggerFrom(ctx).WithName("vSphereVMs").WithValues("folder", folder) ctx = ctrl.LoggerInto(ctx, log) @@ -210,7 +218,7 @@ func (s *janitor) deleteVSphereVMs(ctx context.Context, folder string) error { // * it does not have any children of a different type // * the timestamp field's value is before s.maxCreationDate // If an object does not yet have a field, the janitor will add the field to it with the current timestamp as value. -func (s *janitor) deleteObjectChildren(ctx context.Context, inventoryPath string, objectType string) error { +func (s *Janitor) deleteObjectChildren(ctx context.Context, inventoryPath string, objectType string) error { if !slices.Contains([]string{"ResourcePool", "Folder"}, objectType) { return fmt.Errorf("deleteObjectChildren is not implemented for objectType %s", objectType) } @@ -350,7 +358,8 @@ func (s *janitor) deleteObjectChildren(ctx context.Context, inventoryPath string return nil } -func (s *janitor) deleteIPAddressClaims(ctx context.Context) error { +// DeleteIPAddressClaims deletes IPAddressClaims. +func (s *Janitor) DeleteIPAddressClaims(ctx context.Context) error { log := ctrl.LoggerFrom(ctx).WithName("IPAddressClaims") ctrl.LoggerInto(ctx, log) log.Info("Deleting IPAddressClaims") @@ -387,7 +396,7 @@ func (s *janitor) deleteIPAddressClaims(ctx context.Context) error { return kerrors.NewAggregate(errList) } -func (s *janitor) deleteVSphereClusterModules(ctx context.Context) error { +func (s *Janitor) deleteVSphereClusterModules(ctx context.Context) error { log := ctrl.LoggerFrom(ctx).WithName("vSphere cluster modules") ctrl.LoggerInto(ctx, log) log.Info("Deleting vSphere cluster modules") diff --git a/hack/tools/janitor/janitor_test.go b/hack/tools/pkg/janitor/janitor_test.go similarity index 97% rename from hack/tools/janitor/janitor_test.go rename to hack/tools/pkg/janitor/janitor_test.go index c5f9c5ff41..a604127142 100644 --- a/hack/tools/janitor/janitor_test.go +++ b/hack/tools/pkg/janitor/janitor_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +package janitor import ( "context" @@ -39,7 +39,7 @@ import ( "sigs.k8s.io/cluster-api-provider-vsphere/internal/test/helpers/vcsim" ) -func setup(ctx context.Context, t *testing.T) (*vSphereClients, *vcsim.Simulator) { +func setup(ctx context.Context, t *testing.T) (*VSphereClients, *vcsim.Simulator) { t.Helper() model := &simulator.Model{ ServiceContent: vpx.ServiceContent, @@ -63,7 +63,7 @@ func setup(ctx context.Context, t *testing.T) (*vSphereClients, *vcsim.Simulator fmt.Printf(" export GOVC_PASSWORD=%s\n", vcsim.Password()) fmt.Printf(" export GOVC_INSECURE=true\n") - clients, err := newVSphereClients(ctx, getVSphereClientInput{ + clients, err := NewVSphereClients(ctx, NewVSphereClientsInput{ Username: vcsim.Username(), Password: vcsim.Password(), Server: vcsim.ServerURL().String(), @@ -175,7 +175,7 @@ func Test_janitor_deleteVSphereVMs(t *testing.T) { relativePath, _ := setupTestCase(g, sim, tt.objects) - s := &janitor{ + s := &Janitor{ dryRun: false, maxCreationDate: tt.maxCreationDate, vSphereClients: clients, @@ -313,7 +313,7 @@ func Test_janitor_deleteObjectChildren(t *testing.T) { inventoryPath := path.Join(tt.basePath, relativePath) - s := &janitor{ + s := &Janitor{ dryRun: false, maxCreationDate: time.Now().Add(time.Hour * 1), vSphereClients: clients, @@ -426,7 +426,7 @@ func Test_janitor_CleanupVSphere(t *testing.T) { relativePath, _ := setupTestCase(g, sim, tt.objects) - s := &janitor{ + s := &Janitor{ dryRun: tt.dryRun, maxCreationDate: tt.maxCreationDate, vSphereClients: clients, @@ -438,12 +438,12 @@ func Test_janitor_CleanupVSphere(t *testing.T) { folders := []string{folder} resourcePools := []string{resourcePool} - g.Expect(s.cleanupVSphere(ctx, folders, resourcePools, folders)).To(gomega.Succeed()) + g.Expect(s.CleanupVSphere(ctx, folders, resourcePools, folders, false)).To(gomega.Succeed()) existingObjects, err := recursiveListFoldersAndResourcePools(ctx, relativePath, clients.Govmomi, clients.Finder, clients.ViewManager) g.Expect(err).ToNot(gomega.HaveOccurred()) g.Expect(existingObjects).To(gomega.BeEquivalentTo(tt.wantAfterFirstRun)) - g.Expect(s.cleanupVSphere(ctx, folders, resourcePools, folders)).To(gomega.Succeed()) + g.Expect(s.CleanupVSphere(ctx, folders, resourcePools, folders, false)).To(gomega.Succeed()) existingObjects, err = recursiveListFoldersAndResourcePools(ctx, relativePath, clients.Govmomi, clients.Finder, clients.ViewManager) g.Expect(err).ToNot(gomega.HaveOccurred()) g.Expect(existingObjects).To(gomega.BeEquivalentTo(tt.wantAfterSecondRun)) diff --git a/hack/tools/janitor/vsphere.go b/hack/tools/pkg/janitor/vsphere.go similarity index 91% rename from hack/tools/janitor/vsphere.go rename to hack/tools/pkg/janitor/vsphere.go index b73f45aa39..3e64e720d7 100644 --- a/hack/tools/janitor/vsphere.go +++ b/hack/tools/pkg/janitor/vsphere.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +package janitor import ( "context" @@ -38,7 +38,8 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -type getVSphereClientInput struct { +// NewVSphereClientsInput defines inputs for NewVSphereClients. +type NewVSphereClientsInput struct { Password string Server string Thumbprint string @@ -46,8 +47,8 @@ type getVSphereClientInput struct { Username string } -// vSphereClients is a collection of different clients for vSphere. -type vSphereClients struct { +// VSphereClients is a collection of different clients for vSphere. +type VSphereClients struct { Vim *vim25.Client Govmomi *govmomi.Client Rest *rest.Client @@ -56,8 +57,8 @@ type vSphereClients struct { ViewManager *view.Manager } -// logout logs out all clients. It logs errors if the context contains a logger. -func (v *vSphereClients) logout(ctx context.Context) { +// Logout logs out all clients. It logs errors if the context contains a logger. +func (v *VSphereClients) Logout(ctx context.Context) { log := ctrl.LoggerFrom(ctx) if err := v.Govmomi.Logout(ctx); err != nil { log.Error(err, "logging out govmomi client") @@ -68,8 +69,8 @@ func (v *vSphereClients) logout(ctx context.Context) { } } -// newVSphereClients creates a vSphereClients object from the given input. -func newVSphereClients(ctx context.Context, input getVSphereClientInput) (*vSphereClients, error) { +// NewVSphereClients creates a VSphereClients object from the given input. +func NewVSphereClients(ctx context.Context, input NewVSphereClientsInput) (*VSphereClients, error) { urlCredentials := url.UserPassword(input.Username, input.Password) serverURL, err := soap.ParseURL(input.Server) @@ -113,7 +114,7 @@ func newVSphereClients(ctx context.Context, input getVSphereClientInput) (*vSphe viewManager := view.NewManager(vimClient) finder := find.NewFinder(vimClient, false) - return &vSphereClients{ + return &VSphereClients{ Vim: vimClient, Govmomi: govmomiClient, Rest: restClient, diff --git a/test/e2e/README.md b/test/e2e/README.md index ed897de0db..4adedf40d7 100644 --- a/test/e2e/README.md +++ b/test/e2e/README.md @@ -33,18 +33,18 @@ The first step to running the e2e tests is setting up the required environment v | `VSPHERE_SSH_PRIVATE_KEY` | The file path of the private key used to ssh into the CAPV VMs | `/home/foo/bar-ssh.key` | | `VSPHERE_SSH_AUTHORIZED_KEY` | The public key that is added to the CAPV VMs | `ssh-rsa ABCDEF...XYZ=` | | `VSPHERE_TLS_THUMBPRINT` | The TLS thumbprint of the vSphere server's certificate which should be trusted | `2A:3F:BC:CA:C0:96:35:D4:B7:A2:AA:3C:C1:33:D9:D7:BE:EC:31:55` | -| `CONTROL_PLANE_ENDPOINT_IP` | The IP that kube-vip should use as a control plane endpoint. It will not be used if `E2E_IPAM_KUBECONFIG` is set. | `10.10.123.100` | +| `CONTROL_PLANE_ENDPOINT_IP` | The IP that kube-vip should use as a control plane endpoint. It will not be used if `E2E_VSPHERE_IP_POOL` is set. | `10.10.123.100` | | `VSPHERE_STORAGE_POLICY` | The name of an existing vSphere storage policy to be assigned to created VMs | `my-test-sp` | ### Flags -| Flag | Description | Default Value | -|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------| -| `SKIP_RESOURCE_CLEANUP` | This flags skips cleanup of the resources created during the tests as well as the kind/bootstrap cluster | `false` | -| `USE_EXISTING_CLUSTER` | This flag enables the usage of an existing K8S cluster as the management cluster to run tests against. | `false` | -| `GINKGO_TEST_TIMEOUT` | This sets the timeout for the E2E test suite. | `2h` | -| `GINKGO_FOCUS` | This populates the `-focus` flag of the `ginkgo` run command. | `""` | -| `E2E_IPAM_KUBECONFIG` | This flag points to a kubeconfig where the in-cluster IPAM provider is running to dynamically claim IP addresses for tests. If this is set, the environment variable `CONTROL_PLANE_ENDPOINT_IP` gets ignored. | `""` | +| Flag | Description | Default Value | +|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------| +| `SKIP_RESOURCE_CLEANUP` | This flags skips cleanup of the resources created during the tests as well as the kind/bootstrap cluster | `false` | +| `USE_EXISTING_CLUSTER` | This flag enables the usage of an existing K8S cluster as the management cluster to run tests against. | `false` | +| `GINKGO_TEST_TIMEOUT` | This sets the timeout for the E2E test suite. | `2h` | +| `GINKGO_FOCUS` | This populates the `-focus` flag of the `ginkgo` run command. | `""` | +| `E2E_VSPHERE_IP_POOL` | This allows to configure the IPPool to use for the e2e test. Supports the addresses, gateway and prefix fields from the InClusterIPPool CRD https://github.com/kubernetes-sigs/cluster-api-ipam-provider-in-cluster/blob/main/api/v1alpha2/inclusterippool_types.go. If this is set, the environment variable `CONTROL_PLANE_ENDPOINT_IP` gets ignored. | `""` | ### Running the e2e tests diff --git a/test/e2e/config/vsphere.yaml b/test/e2e/config/vsphere.yaml index 151453dab8..2aeb087684 100644 --- a/test/e2e/config/vsphere.yaml +++ b/test/e2e/config/vsphere.yaml @@ -268,8 +268,8 @@ variables: VSPHERE_CONTENT_LIBRARY: "capv" VSPHERE_CONTENT_LIBRARY_ITEMS: "ubuntu-2204-kube-v1.28.0,ubuntu-2204-kube-v1.29.0,ubuntu-2204-kube-v1.30.0" VSPHERE_IMAGE_NAME: "ubuntu-2204-kube-v1.30.0" - VSPHERE_NETWORK: "sddc-cgw-network-6" - VSPHERE_DISTRIBUTED_PORT_GROUP: "/SDDC-Datacenter/network/sddc-cgw-network-6" + VSPHERE_NETWORK: "sddc-cgw-network-10" + VSPHERE_DISTRIBUTED_PORT_GROUP: "/SDDC-Datacenter/network/sddc-cgw-network-10" VSPHERE_TEMPLATE: "ubuntu-2204-kube-v1.30.0" FLATCAR_VSPHERE_TEMPLATE: "flatcar-stable-3815.2.2-kube-v1.30.0" VSPHERE_INSECURE_CSI: "true" diff --git a/test/e2e/e2e_setup_test.go b/test/e2e/e2e_setup_test.go index 0d5f0dc012..82b126ed6d 100644 --- a/test/e2e/e2e_setup_test.go +++ b/test/e2e/e2e_setup_test.go @@ -45,6 +45,7 @@ import ( type setupOptions struct { additionalIPVariableNames []string gatewayIPVariableName string + prefixVariableName string } // SetupOption is a configuration option supplied to Setup. @@ -65,6 +66,13 @@ func WithGateway(variableName string) SetupOption { } } +// WithPrefix instructs Setup to store the prefix from IPAM into the provided variableName. +func WithPrefix(variableName string) SetupOption { + return func(o *setupOptions) { + o.prefixVariableName = variableName + } +} + type testSettings struct { ClusterctlConfigPath string PostNamespaceCreatedFunc func(managementClusterProxy framework.ClusterProxy, workloadClusterNamespace string) @@ -90,7 +98,7 @@ func Setup(specName string, f func(testSpecificSettings func() testSettings), op case VCenterTestTarget: Byf("Getting IP for %s", strings.Join(append([]string{vsphereip.ControlPlaneEndpointIPVariable}, options.additionalIPVariableNames...), ",")) // get IPs from the in cluster address manager - testSpecificIPAddressClaims, testSpecificVariables = inClusterAddressManager.ClaimIPs(ctx, vsphereip.WithGateway(options.gatewayIPVariableName), vsphereip.WithIP(options.additionalIPVariableNames...)) + testSpecificIPAddressClaims, testSpecificVariables = inClusterAddressManager.ClaimIPs(ctx, vsphereip.WithGateway(options.gatewayIPVariableName), vsphereip.WithPrefix(options.prefixVariableName), vsphereip.WithIP(options.additionalIPVariableNames...)) case VCSimTestTarget: c := bootstrapClusterProxy.GetClient() diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index e7c0dfb54b..1e8f623682 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -33,6 +33,7 @@ import ( storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" + ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1beta1" "sigs.k8s.io/cluster-api/test/framework" "sigs.k8s.io/cluster-api/test/framework/bootstrap" "sigs.k8s.io/cluster-api/test/framework/clusterctl" @@ -119,9 +120,8 @@ var ( namespaces map[*corev1.Namespace]context.CancelFunc - // e2eIPAMKubeconfig is a kubeconfig to a cluster which provides IP address management via an in-cluster - // IPAM provider to claim IPs for the control plane IPs of created clusters. - e2eIPAMKubeconfig string + // e2eIPPool to be used for the e2e test. + e2eIPPool string // inClusterAddressManager is used to claim and cleanup IP addresses used for kubernetes control plane API Servers. inClusterAddressManager vsphereip.AddressManager @@ -137,7 +137,7 @@ func init() { flag.BoolVar(&alsoLogToFile, "e2e.also-log-to-file", true, "if true, ginkgo logs are additionally written to the `ginkgo-log.txt` file in the artifacts folder (including timestamps)") flag.BoolVar(&skipCleanup, "e2e.skip-resource-cleanup", false, "if true, the resource cleanup after tests will be skipped") flag.BoolVar(&useExistingCluster, "e2e.use-existing-cluster", false, "if true, the test uses the current cluster instead of creating a new one (default discovery rules apply)") - flag.StringVar(&e2eIPAMKubeconfig, "e2e.ipam-kubeconfig", "", "path to the kubeconfig for the IPAM cluster") + flag.StringVar(&e2eIPPool, "e2e.ip-pool", "", "IPPool to use for the e2e test. Supports the addresses, gateway and prefix fields from the InClusterIPPool CRD https://github.com/kubernetes-sigs/cluster-api-ipam-provider-in-cluster/blob/main/api/v1alpha2/inclusterippool_types.go") } func TestE2E(t *testing.T) { @@ -285,7 +285,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { switch testTarget { case VCenterTestTarget: // Create the in cluster address manager - inClusterAddressManager, err = vsphereip.InClusterAddressManager(e2eIPAMKubeconfig, ipClaimLabels, skipCleanup) + inClusterAddressManager, err = vsphereip.InClusterAddressManager(ctx, bootstrapClusterProxy.GetClient(), e2eIPPool, ipClaimLabels, skipCleanup) Expect(err).ToNot(HaveOccurred()) case VCSimTestTarget: @@ -335,6 +335,8 @@ var _ = SynchronizedAfterSuite(func() { func initScheme() *runtime.Scheme { sc := runtime.NewScheme() framework.TryAddDefaultSchemes(sc) + // TODO: should probably be added to TryAddDefaultSchemes in core CAPI. + _ = ipamv1.AddToScheme(sc) if testTarget == VCSimTestTarget { _ = vcsimv1.AddToScheme(sc) diff --git a/test/e2e/ipam_test.go b/test/e2e/ipam_test.go index 85372cfccd..e98b43efd1 100644 --- a/test/e2e/ipam_test.go +++ b/test/e2e/ipam_test.go @@ -42,6 +42,9 @@ var _ = Describe("ClusterClass Creation using Cluster API quick-start test and I // Set the WithGateway option to write the gateway ip address to the variable. // This variable is required for creating the InClusterIPPool for the ipam provider. WithGateway("IPAM_GATEWAY"), + // Set the WithPrefix option to set the prefix to the variable. + // This variable is required for creating the InClusterIPPool for the ipam provider. + WithPrefix("IPAM_PREFIX"), // Claim two IPs from the CI's IPAM provider to use in the InClusterIPPool of // the ipam provider. The IPs then get claimed during provisioning to configure // static IPs for the control-plane and worker node. diff --git a/test/framework/ip/addressmanager.go b/test/framework/ip/addressmanager.go index 15cbf1e1c2..88ffa61fb0 100644 --- a/test/framework/ip/addressmanager.go +++ b/test/framework/ip/addressmanager.go @@ -44,6 +44,7 @@ type AddressClaims []AddressClaim type claimOptions struct { additionalIPVariableNames []string gatewayIPVariableName string + prefixVariableName string } type ClaimOption func(*claimOptions) @@ -63,6 +64,13 @@ func WithGateway(variableName string) ClaimOption { } } +// WithPrefix instructs Setup to store the prefix from IPAM into the provided variableName. +func WithPrefix(variableName string) ClaimOption { + return func(o *claimOptions) { + o.prefixVariableName = variableName + } +} + type teardownOptions struct { folderName string vSphereClient *govmomi.Client diff --git a/test/framework/ip/incluster.go b/test/framework/ip/incluster.go index 48db2e2d39..2cc993b7ef 100644 --- a/test/framework/ip/incluster.go +++ b/test/framework/ip/incluster.go @@ -18,9 +18,10 @@ package ip import ( "context" + "encoding/json" "fmt" "os" - "path/filepath" + "strconv" "time" . "github.com/onsi/ginkgo/v2" @@ -30,12 +31,13 @@ import ( "github.com/vmware/govmomi/find" "github.com/vmware/govmomi/vim25/mo" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "k8s.io/utils/ptr" ipamv1 "sigs.k8s.io/cluster-api/exp/ipam/api/v1beta1" @@ -43,42 +45,104 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func init() { - ipamScheme = runtime.NewScheme() - _ = ipamv1.AddToScheme(ipamScheme) -} - var _ AddressManager = &inCluster{} +var ipPoolName = "capv-e2e-ippool" + type inCluster struct { client client.Client labels map[string]string skipCleanup bool + ipPool *unstructured.Unstructured +} + +// inClusterIPPoolSpec defines the desired state of InClusterIPPool. +// Note: This is a copy of the relevant fields from: https://github.com/kubernetes-sigs/cluster-api-ipam-provider-in-cluster/blob/main/api/v1alpha2/inclusterippool_types.go +// This was copied to avoid a go dependency on this provider. +type inClusterIPPoolSpec struct { + // Addresses is a list of IP addresses that can be assigned. This set of + // addresses can be non-contiguous. + Addresses []string `json:"addresses"` + + // Prefix is the network prefix to use. + // +kubebuilder:validation:Maximum=128 + Prefix int `json:"prefix"` + + // Gateway + // +optional + Gateway string `json:"gateway,omitempty"` } // InClusterAddressManager returns an ip.AddressManager implementation that leverage on the IPAM provider installed into the management cluster. // If e2eIPAMKubeconfig is an empty string it will return a noop AddressManager which does nothing so we can fallback on setting environment variables. -func InClusterAddressManager(e2eIPAMKubeconfig string, labels map[string]string, skipCleanup bool) (AddressManager, error) { +func InClusterAddressManager(ctx context.Context, client client.Client, e2eIPPool string, labels map[string]string, skipCleanup bool) (AddressManager, error) { if len(labels) == 0 { return nil, fmt.Errorf("expecting labels to be set to prevent deletion of other IPAddressClaims") } - if e2eIPAMKubeconfig == "" { + if e2eIPPool == "" { return &noop{}, nil } - ipamClient, err := getClient(e2eIPAMKubeconfig) + ipPool, err := createIPPool(ctx, client, e2eIPPool) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to create IPPool") } return &inCluster{ labels: labels, - client: ipamClient, + client: client, + ipPool: ipPool, skipCleanup: skipCleanup, }, nil } +func createIPPool(ctx context.Context, c client.Client, e2eIPPool string) (*unstructured.Unstructured, error) { + ipPoolSpec := inClusterIPPoolSpec{} + if err := json.Unmarshal([]byte(e2eIPPool), &ipPoolSpec); err != nil { + return nil, fmt.Errorf("failed to unmarshal IP Pool configuration") + } + + ipPool := &unstructured.Unstructured{} + ipPool.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "ipam.cluster.x-k8s.io", + Version: "v1alpha2", + Kind: "InClusterIPPool", + }) + ipPool.SetNamespace(metav1.NamespaceDefault) + ipPool.SetName(ipPoolName) + // Note: We have to convert ipPoolSpec to a map[string]interface{}, otherwise SetNestedField panics in DeepCopyJSONValue. + addresses := []interface{}{} + for _, a := range ipPoolSpec.Addresses { + addresses = append(addresses, a) + } + spec := map[string]interface{}{ + "addresses": addresses, + "prefix": int64(ipPoolSpec.Prefix), // DeepCopyJSONValue only supports int64. + "gateway": ipPoolSpec.Gateway, + } + if err := unstructured.SetNestedField(ipPool.Object, spec, "spec"); err != nil { + return nil, fmt.Errorf("failed to set InClusterIPPool spec") + } + + // InClusterAddressManager is called on multiple ginkgo workers at the same time. + // So some of them will hit AlreadyExists errors. + // In this case we are just retrieving the already existing InClusterIPPool. + // Note: The InClusterIPPool is intentionally not deleted on TearDown, because at + // this time IPAddressClaim are still in deleting (so we would get an error when triggering deletion). + if err := c.Create(ctx, ipPool); err != nil { + if !apierrors.IsAlreadyExists(err) { + return nil, err + } + + if err := c.Get(ctx, client.ObjectKey{Namespace: metav1.NamespaceDefault, Name: ipPoolName}, ipPool); err != nil { + return nil, err + } + } + + return ipPool, nil +} + func (h *inCluster) ClaimIPs(ctx context.Context, opts ...ClaimOption) (AddressClaims, map[string]string) { options := &claimOptions{} for _, o := range opts { @@ -98,12 +162,19 @@ func (h *inCluster) ClaimIPs(ctx context.Context, opts ...ClaimOption) (AddressC }) Byf("Setting clusterctl variable %s to %s", variable, ip.Spec.Address) variables[variable] = ip.Spec.Address - if variable == ControlPlaneEndpointIPVariable && options.gatewayIPVariableName != "" { - // Set the gateway variable if requested to the gateway of the control plane IP. - // This is required in ipam scenarios, otherwise the VMs will not be able to - // connect to the public internet to pull images. - Byf("Setting clusterctl variable %s to %s", variable, ip.Spec.Gateway) - variables[options.gatewayIPVariableName] = ip.Spec.Gateway + if variable == ControlPlaneEndpointIPVariable { + if options.gatewayIPVariableName != "" { + // Set the gateway variable if requested to the gateway of the control plane IP. + // This is required in ipam scenarios, otherwise the VMs will not be able to + // connect to the public internet to pull images. + Byf("Setting clusterctl variable %s to %s", options.gatewayIPVariableName, ip.Spec.Gateway) + variables[options.gatewayIPVariableName] = ip.Spec.Gateway + } + if options.prefixVariableName != "" { + // Set the prefix variable if requested to the prefix of the control plane IP. + Byf("Setting clusterctl variable %s to %s", options.prefixVariableName, ip.Spec.Gateway) + variables[options.prefixVariableName] = strconv.Itoa(ip.Spec.Prefix) + } } } @@ -225,20 +296,6 @@ func (h *inCluster) Teardown(ctx context.Context, opts ...TearDownOption) error return nil } -func getClient(e2eIPAMKubeconfig string) (client.Client, error) { - kubeConfig, err := os.ReadFile(filepath.Clean(e2eIPAMKubeconfig)) - if err != nil { - return nil, err - } - - restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeConfig) - if err != nil { - return nil, err - } - - return client.New(restConfig, client.Options{Scheme: ipamScheme}) -} - // getVirtualMachineIPAddresses lists all VirtualMachines in the given folder and // returns a map which contains the IP addresses of all machines. // If the given folder is not found it will return an error. @@ -292,9 +349,9 @@ func (h *inCluster) claimIPAddress(ctx context.Context) (_ *ipamv1.IPAddress, _ }, Spec: ipamv1.IPAddressClaimSpec{ PoolRef: corev1.TypedLocalObjectReference{ - APIGroup: ptr.To("ipam.cluster.x-k8s.io"), - Kind: "InClusterIPPool", - Name: "capv-e2e-ippool", + APIGroup: ptr.To(h.ipPool.GroupVersionKind().Group), + Kind: h.ipPool.GroupVersionKind().Kind, + Name: h.ipPool.GetName(), }, }, }