From aee43df0c0a71def9851f0d846aad99cda66a704 Mon Sep 17 00:00:00 2001 From: Serge Logvinov Date: Mon, 13 May 2024 17:10:16 +0300 Subject: [PATCH] support multi region cluster --- ...sing-openstack-cloud-controller-manager.md | 6 +- pkg/client/client.go | 16 ++ pkg/openstack/instancesv2.go | 176 ++++++++++++------ pkg/openstack/openstack.go | 9 +- 4 files changed, 151 insertions(+), 56 deletions(-) diff --git a/docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md b/docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md index 06bc54bb17..e2832f67e0 100644 --- a/docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md +++ b/docs/openstack-cloud-controller-manager/using-openstack-cloud-controller-manager.md @@ -114,6 +114,8 @@ The options in `Global` section are used for openstack-cloud-controller-manager Keystone user password. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required. * `region` Required. Keystone region name. +* `regions` + Optional. Keystone region name, witch is used to specify regions for the cloud provider where the instance is running. Region is default region name. Can be specified multiple times. * `domain-id` Keystone user domain ID. If you are using [Keystone application credential](https://docs.openstack.org/keystone/latest/user/application_credentials.html), this option is not required. * `domain-name` @@ -207,7 +209,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N * `ROUND_ROBIN` (default) * `LEAST_CONNECTIONS` * `SOURCE_IP` - + If `lb-provider` is set to "ovn" the value must be set to `SOURCE_IP_PORT`. * `lb-provider` @@ -300,7 +302,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N call](https://docs.openstack.org/api-ref/load-balancer/v2/?expanded=create-a-load-balancer-detail#creating-a-fully-populated-load-balancer). Setting this option to true will create loadbalancers using serial API calls which first create an unpopulated loadbalancer, then populate its listeners, pools and members. This is a compatibility option at the expense of - increased load on the OpenStack API. Default: false + increased load on the OpenStack API. Default: false NOTE: diff --git a/pkg/client/client.go b/pkg/client/client.go index 721f548fd3..7df91291e1 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -54,6 +54,7 @@ type AuthOpts struct { UserDomainID string `gcfg:"user-domain-id" mapstructure:"user-domain-id" name:"os-userDomainID" value:"optional"` UserDomainName string `gcfg:"user-domain-name" mapstructure:"user-domain-name" name:"os-userDomainName" value:"optional"` Region string `name:"os-region"` + Regions []string `name:"os-regions" value:"optional"` EndpointType gophercloud.Availability `gcfg:"os-endpoint-type" mapstructure:"os-endpoint-type" name:"os-endpointType" value:"optional"` CAFile string `gcfg:"ca-file" mapstructure:"ca-file" name:"os-certAuthorityPath" value:"optional"` TLSInsecure string `gcfg:"tls-insecure" mapstructure:"tls-insecure" name:"os-TLSInsecure" value:"optional" matches:"^true|false$"` @@ -88,6 +89,7 @@ func LogCfg(authOpts AuthOpts) { klog.V(5).Infof("UserDomainID: %s", authOpts.UserDomainID) klog.V(5).Infof("UserDomainName: %s", authOpts.UserDomainName) klog.V(5).Infof("Region: %s", authOpts.Region) + klog.V(5).Infof("Regions: %s", authOpts.Regions) klog.V(5).Infof("EndpointType: %s", authOpts.EndpointType) klog.V(5).Infof("CAFile: %s", authOpts.CAFile) klog.V(5).Infof("CertFile: %s", authOpts.CertFile) @@ -233,6 +235,20 @@ func ReadClouds(authOpts *AuthOpts) error { authOpts.ApplicationCredentialName = replaceEmpty(authOpts.ApplicationCredentialName, cloud.AuthInfo.ApplicationCredentialName) authOpts.ApplicationCredentialSecret = replaceEmpty(authOpts.ApplicationCredentialSecret, cloud.AuthInfo.ApplicationCredentialSecret) + regions := strings.Split(authOpts.Region, ",") + if len(regions) > 1 { + authOpts.Region = regions[0] + } + + for _, r := range cloud.Regions { + // Support only single auth section in clouds.yaml + if r.Values.AuthInfo == nil && r.Name != authOpts.Region { + regions = append(regions, r.Name) + } + } + + authOpts.Regions = regions + return nil } diff --git a/pkg/openstack/instancesv2.go b/pkg/openstack/instancesv2.go index 3e6f770f7e..f0a6d0a66c 100644 --- a/pkg/openstack/instancesv2.go +++ b/pkg/openstack/instancesv2.go @@ -20,6 +20,8 @@ import ( "context" "fmt" sysos "os" + "slices" + "strings" "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack/compute/v2/servers" @@ -33,9 +35,9 @@ import ( // InstancesV2 encapsulates an implementation of InstancesV2 for OpenStack. type InstancesV2 struct { - compute *gophercloud.ServiceClient - network *gophercloud.ServiceClient - region string + compute map[string]*gophercloud.ServiceClient + network map[string]*gophercloud.ServiceClient + regions []string regionProviderID bool networkingOpts NetworkingOpts } @@ -51,16 +53,25 @@ func (os *OpenStack) InstancesV2() (cloudprovider.InstancesV2, bool) { func (os *OpenStack) instancesv2() (*InstancesV2, bool) { klog.V(4).Info("openstack.Instancesv2() called") - compute, err := client.NewComputeV2(os.provider, os.epOpts) - if err != nil { - klog.Errorf("unable to access compute v2 API : %v", err) - return nil, false - } + var err error + compute := make(map[string]*gophercloud.ServiceClient, len(os.regions)) + network := make(map[string]*gophercloud.ServiceClient, len(os.regions)) - network, err := client.NewNetworkV2(os.provider, os.epOpts) - if err != nil { - klog.Errorf("unable to access network v2 API : %v", err) - return nil, false + for _, region := range os.regions { + opt := os.epOpts + opt.Region = region + + compute[region], err = client.NewComputeV2(os.provider, opt) + if err != nil { + klog.Errorf("unable to access compute v2 API : %v", err) + return nil, false + } + + network[region], err = client.NewNetworkV2(os.provider, opt) + if err != nil { + klog.Errorf("unable to access network v2 API : %v", err) + return nil, false + } } regionalProviderID := false @@ -71,7 +82,7 @@ func (os *OpenStack) instancesv2() (*InstancesV2, bool) { return &InstancesV2{ compute: compute, network: network, - region: os.epOpts.Region, + regions: os.regions, regionProviderID: regionalProviderID, networkingOpts: os.networkingOpts, }, true @@ -79,9 +90,15 @@ func (os *OpenStack) instancesv2() (*InstancesV2, bool) { // InstanceExists indicates whether a given node exists according to the cloud provider func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool, error) { - _, err := i.getInstance(ctx, node) + klog.V(4).InfoS("openstack.InstanceExists() called", "node", klog.KObj(node), + "providerID", node.Spec.ProviderID, + "region", node.Labels[v1.LabelTopologyRegion]) + + _, _, err := i.getInstance(ctx, node) if err == cloudprovider.InstanceNotFound { - klog.V(6).Infof("instance not found for node: %s", node.Name) + klog.V(6).InfoS("Node is not found in cloud provider", "node", klog.KObj(node), + "providerID", node.Spec.ProviderID, + "region", node.Labels[v1.LabelTopologyRegion]) return false, nil } @@ -94,7 +111,11 @@ func (i *InstancesV2) InstanceExists(ctx context.Context, node *v1.Node) (bool, // InstanceShutdown returns true if the instance is shutdown according to the cloud provider. func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool, error) { - server, err := i.getInstance(ctx, node) + klog.V(4).InfoS("openstack.InstanceShutdown() called", "node", klog.KObj(node), + "providerID", node.Spec.ProviderID, + "region", node.Labels[v1.LabelTopologyRegion]) + + server, _, err := i.getInstance(ctx, node) if err != nil { return false, err } @@ -109,7 +130,7 @@ func (i *InstancesV2) InstanceShutdown(ctx context.Context, node *v1.Node) (bool // InstanceMetadata returns the instance's metadata. func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*cloudprovider.InstanceMetadata, error) { - srv, err := i.getInstance(ctx, node) + srv, region, err := i.getInstance(ctx, node) if err != nil { return nil, err } @@ -118,79 +139,128 @@ func (i *InstancesV2) InstanceMetadata(ctx context.Context, node *v1.Node) (*clo server = *srv } - instanceType, err := srvInstanceType(i.compute, &server.Server) + instanceType, err := srvInstanceType(i.compute[region], &server.Server) if err != nil { return nil, err } - ports, err := getAttachedPorts(i.network, server.ID) + ports, err := getAttachedPorts(i.network[region], server.ID) if err != nil { return nil, err } - addresses, err := nodeAddresses(&server.Server, ports, i.network, i.networkingOpts) + addresses, err := nodeAddresses(&server.Server, ports, i.network[region], i.networkingOpts) if err != nil { return nil, err } return &cloudprovider.InstanceMetadata{ - ProviderID: i.makeInstanceID(&server.Server), + ProviderID: i.makeInstanceID(&server.Server, region), InstanceType: instanceType, NodeAddresses: addresses, Zone: server.AvailabilityZone, - Region: i.region, + Region: region, }, nil } -func (i *InstancesV2) makeInstanceID(srv *servers.Server) string { +func (i *InstancesV2) makeInstanceID(srv *servers.Server, region string) string { if i.regionProviderID { - return fmt.Sprintf("%s://%s/%s", ProviderName, i.region, srv.ID) + return fmt.Sprintf("%s://%s/%s", ProviderName, region, srv.ID) } return fmt.Sprintf("%s:///%s", ProviderName, srv.ID) } -func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*ServerAttributesExt, error) { - if node.Spec.ProviderID == "" { - opt := servers.ListOpts{ - Name: fmt.Sprintf("^%s$", node.Name), - } - mc := metrics.NewMetricContext("server", "list") - allPages, err := servers.List(i.compute, opt).AllPages() - if mc.ObserveRequest(err) != nil { - return nil, fmt.Errorf("error listing servers %v: %v", opt, err) - } +func (i *InstancesV2) getInstance(ctx context.Context, node *v1.Node) (*ServerAttributesExt, string, error) { + klog.V(4).InfoS("openstack.getInstance() called", "node", klog.KObj(node), + "providerID", node.Spec.ProviderID, + "region", node.Labels[v1.LabelTopologyRegion]) - serverList := []ServerAttributesExt{} - err = servers.ExtractServersInto(allPages, &serverList) - if err != nil { - return nil, fmt.Errorf("error extracting servers from pages: %v", err) - } - if len(serverList) == 0 { - return nil, cloudprovider.InstanceNotFound - } - if len(serverList) > 1 { - return nil, fmt.Errorf("getInstance: multiple instances found") - } - return &serverList[0], nil + if node.Spec.ProviderID == "" { + return i.getInstanceByName(node) } instanceID, instanceRegion, err := instanceIDFromProviderID(node.Spec.ProviderID) if err != nil { - return nil, err + return nil, "", err + } + + if instanceRegion == "" { + return i.getInstanceByID(instanceID, node.Labels[v1.LabelTopologyRegion]) } - if instanceRegion != "" && instanceRegion != i.region { - return nil, fmt.Errorf("ProviderID \"%s\" didn't match supported region \"%s\"", node.Spec.ProviderID, i.region) + if !slices.Contains(i.regions, instanceRegion) { + return nil, "", fmt.Errorf("getInstance: ProviderID \"%s\" didn't match supported regions \"%s\"", node.Spec.ProviderID, strings.Join(i.regions, ",")) } server := ServerAttributesExt{} mc := metrics.NewMetricContext("server", "get") - err = servers.Get(i.compute, instanceID).ExtractInto(&server) + err = servers.Get(i.compute[instanceRegion], instanceID).ExtractInto(&server) if mc.ObserveRequest(err) != nil { if errors.IsNotFound(err) { - return nil, cloudprovider.InstanceNotFound + return nil, "", cloudprovider.InstanceNotFound } - return nil, err + return nil, "", err } - return &server, nil + + return &server, instanceRegion, nil +} + +func (i *InstancesV2) getInstanceByID(instanceID, preferedRegion string) (*ServerAttributesExt, string, error) { + server := ServerAttributesExt{} + regions := i.regions + + if preferedRegion != "" && slices.Contains(i.regions, preferedRegion) { + regions = []string{preferedRegion} + for _, r := range i.regions { + if r != preferedRegion { + regions = append(regions, r) + } + } + } + + mc := metrics.NewMetricContext("server", "get") + for _, r := range regions { + err := servers.Get(i.compute[r], instanceID).ExtractInto(&server) + if mc.ObserveRequest(err) != nil { + if errors.IsNotFound(err) { + continue + } + + return nil, "", err + } + + return &server, r, nil + } + + return nil, "", cloudprovider.InstanceNotFound +} + +func (i *InstancesV2) getInstanceByName(node *v1.Node) (*ServerAttributesExt, string, error) { + opt := servers.ListOpts{ + Name: fmt.Sprintf("^%s$", node.Name), + } + mc := metrics.NewMetricContext("server", "list") + serverList := []ServerAttributesExt{} + + for _, r := range i.regions { + allPages, err := servers.List(i.compute[r], opt).AllPages() + if mc.ObserveRequest(err) != nil { + return nil, "", fmt.Errorf("error listing servers %v: %v", opt, err) + } + + err = servers.ExtractServersInto(allPages, &serverList) + if err != nil { + return nil, "", fmt.Errorf("error extracting servers from pages: %v", err) + } + if len(serverList) == 0 { + continue + } + if len(serverList) > 1 { + return nil, "", fmt.Errorf("getInstanceByName: multiple instances found") + } + + return &serverList[0], r, nil + } + + return nil, "", cloudprovider.InstanceNotFound } diff --git a/pkg/openstack/openstack.go b/pkg/openstack/openstack.go index aee84846cd..7a80c8d7b5 100644 --- a/pkg/openstack/openstack.go +++ b/pkg/openstack/openstack.go @@ -37,7 +37,7 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/klog/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/scheme" @@ -159,6 +159,7 @@ type ServerAttributesExt struct { // OpenStack is an implementation of cloud provider Interface for OpenStack. type OpenStack struct { + regions []string provider *gophercloud.ProviderClient epOpts *gophercloud.EndpointOpts lbOpts LoadBalancerOpts @@ -257,6 +258,11 @@ func ReadConfig(config io.Reader) (Config, error) { klog.V(5).Infof("Config, loaded from the %s:", cfg.Global.CloudsFile) client.LogCfg(cfg.Global) } + + if len(cfg.Global.Regions) == 0 { + cfg.Global.Regions = []string{cfg.Global.Region} + } + // Set the default values for search order if not set if cfg.Metadata.SearchOrder == "" { cfg.Metadata.SearchOrder = fmt.Sprintf("%s,%s", metadata.ConfigDriveID, metadata.MetadataID) @@ -309,6 +315,7 @@ func NewOpenStack(cfg Config) (*OpenStack, error) { } os := OpenStack{ + regions: cfg.Global.Regions, provider: provider, epOpts: &gophercloud.EndpointOpts{ Region: cfg.Global.Region,