From 4feceb73c610553e26ca2c360b6eef2f7b25b232 Mon Sep 17 00:00:00 2001 From: Wantong Jiang Date: Tue, 19 Apr 2022 07:18:46 +0000 Subject: [PATCH] Add pip cache --- pkg/consts/consts.go | 3 ++ pkg/provider/azure.go | 8 +++++ pkg/provider/azure_backoff.go | 34 +++++++++++++----- pkg/provider/azure_backoff_test.go | 52 ++++++++++++++++++++++++--- pkg/provider/azure_fakes.go | 1 + pkg/provider/azure_loadbalancer.go | 8 ++--- pkg/provider/azure_ratelimit_test.go | 2 ++ pkg/provider/azure_standard.go | 2 +- pkg/provider/azure_test.go | 5 +++ pkg/provider/azure_wrap.go | 53 ++++++++++++++++++++++------ 10 files changed, 140 insertions(+), 28 deletions(-) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 6bb5ebe0f0..da644edc39 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -326,6 +326,9 @@ const ( // LoadBalancerBackendPoolConfigurationTypePODIP is the lb backend pool config type pod ip // TODO (nilo19): support pod IP in the future LoadBalancerBackendPoolConfigurationTypePODIP = "podIP" + + // To get pip, we need both resource group name and pip name, key in cache has format: pip_rg:pip_name + PIPCacheKeySeparator = ":" ) // error messages diff --git a/pkg/provider/azure.go b/pkg/provider/azure.go index 3e32a65ea8..b7397fb0a5 100644 --- a/pkg/provider/azure.go +++ b/pkg/provider/azure.go @@ -228,6 +228,8 @@ type Config struct { RouteTableCacheTTLInSeconds int `json:"routeTableCacheTTLInSeconds,omitempty" yaml:"routeTableCacheTTLInSeconds,omitempty"` // AvailabilitySetsCacheTTLInSeconds sets the cache TTL for VMAS AvailabilitySetsCacheTTLInSeconds int `json:"availabilitySetsCacheTTLInSeconds,omitempty" yaml:"availabilitySetsCacheTTLInSeconds,omitempty"` + // PublicIPCacheTTLInSeconds sets the cache TTL for public ip + PublicIPCacheTTLInSeconds int `json:"publicIPCacheTTLInSeconds,omitempty" yaml:"publicIPCacheTTLInSeconds,omitempty"` // RouteUpdateWaitingInSeconds is the delay time for waiting route updates to take effect. This waiting delay is added // because the routes are not taken effect when the async route updating operation returns success. Default is 30 seconds. RouteUpdateWaitingInSeconds int `json:"routeUpdateWaitingInSeconds,omitempty" yaml:"routeUpdateWaitingInSeconds,omitempty"` @@ -336,6 +338,7 @@ type Cloud struct { lbCache *azcache.TimedCache nsgCache *azcache.TimedCache rtCache *azcache.TimedCache + pipCache *azcache.TimedCache *ManagedDiskController *controllerCommon @@ -667,6 +670,11 @@ func (az *Cloud) initCaches() (err error) { return err } + az.pipCache, err = az.newPIPCache() + if err != nil { + return err + } + return nil } diff --git a/pkg/provider/azure_backoff.go b/pkg/provider/azure_backoff.go index 90256825b0..8680335db9 100644 --- a/pkg/provider/azure_backoff.go +++ b/pkg/provider/azure_backoff.go @@ -246,7 +246,7 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer) } pipRG, pipName := matches[1], matches[2] klog.V(3).Infof("The public IP %s referenced by load balancer %s is not in Succeeded provisioning state, will try to update it", pipName, to.String(lb.Name)) - pip, _, err := az.getPublicIPAddress(pipRG, pipName) + pip, _, err := az.getPublicIPAddress(pipRG, pipName, azcache.CacheReadTypeDefault) if err != nil { klog.Errorf("Failed to get the public IP %s in resource group %s: %v", pipName, pipRG, err) return rerr.Error() @@ -411,17 +411,33 @@ func (az *Cloud) CreateOrUpdatePIP(service *v1.Service, pipResourceGroup string, rerr := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, to.String(pip.Name), pip) klog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, to.String(pip.Name)) - if rerr != nil { - pipJSON, _ := json.Marshal(pip) - klog.Warningf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s, PublicIP request: %s", pipResourceGroup, to.String(pip.Name), rerr.Error().Error(), string(pipJSON)) - az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error()) - return rerr.Error() + if rerr == nil { + // Invalidate the cache right after updating + _ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name))) + return nil } - return nil + pipJSON, _ := json.Marshal(pip) + klog.Warningf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s, PublicIP request: %s", pipResourceGroup, to.String(pip.Name), rerr.Error().Error(), string(pipJSON)) + az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error()) + + // Invalidate the cache because ETAG precondition mismatch. + if rerr.HTTPStatusCode == http.StatusPreconditionFailed { + klog.V(3).Infof("PublicIP cache for (%s, %s) is cleanup because of http.StatusPreconditionFailed", pipResourceGroup, to.String(pip.Name)) + _ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name))) + } + + retryErrorMessage := rerr.Error().Error() + // Invalidate the cache because another new operation has canceled the current request. + if strings.Contains(strings.ToLower(retryErrorMessage), consts.OperationCanceledErrorMessage) { + klog.V(3).Infof("PublicIP cache for (%s, %s) is cleanup because CreateOrUpdate is canceled by another operation", pipResourceGroup, to.String(pip.Name)) + _ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name))) + } + + return rerr.Error() } -// CreateOrUpdateInterface invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry +// CreateOrUpdateInterface invokes az.InterfacesClient.CreateOrUpdate with exponential backoff retry func (az *Cloud) CreateOrUpdateInterface(service *v1.Service, nic network.Interface) error { ctx, cancel := getContextWithCancel() defer cancel() @@ -454,6 +470,8 @@ func (az *Cloud) DeletePublicIP(service *v1.Service, pipResourceGroup string, pi return rerr.Error() } + // Invalidate the cache right after deleting + _ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, pipName)) return nil } diff --git a/pkg/provider/azure_backoff_test.go b/pkg/provider/azure_backoff_test.go index 7c719aadf1..2e99312379 100644 --- a/pkg/provider/azure_backoff_test.go +++ b/pkg/provider/azure_backoff_test.go @@ -290,6 +290,11 @@ func TestCreateOrUpdateLB(t *testing.T) { shouldBeEmpty, err := az.lbCache.Get("lb", cache.CacheReadTypeDefault) assert.NoError(t, err) assert.Empty(t, shouldBeEmpty) + + // public ip cache should be populated since there's GetPIP + shouldNotBeEmpty, err := az.pipCache.Get(az.getPIPCacheKey(az.ResourceGroup, "pip"), cache.CacheReadTypeDefault) + assert.NoError(t, err) + assert.NotEmpty(t, shouldNotBeEmpty) } } @@ -377,12 +382,49 @@ func TestCreateOrUpdatePIP(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - az := GetTestCloud(ctrl) - mockPIPClient := az.PublicIPAddressesClient.(*mockpublicipclient.MockInterface) - mockPIPClient.EXPECT().CreateOrUpdate(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError}) + tests := []struct { + clientErr *retry.Error + expectedErr error + cacheExpectedEmpty bool + }{ + { + clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed}, + expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)), + cacheExpectedEmpty: true, + }, + { + clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)}, + expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")), + cacheExpectedEmpty: true, + }, + { + clientErr: &retry.Error{HTTPStatusCode: http.StatusInternalServerError}, + expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), + cacheExpectedEmpty: false, + }, + } - err := az.CreateOrUpdatePIP(&v1.Service{}, az.ResourceGroup, network.PublicIPAddress{Name: to.StringPtr("nic")}) - assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), err.Error()) + for _, test := range tests { + az := GetTestCloud(ctrl) + cacheKey := az.getPIPCacheKey(az.ResourceGroup, "nic") + az.pipCache.Set(cacheKey, "test") + mockPIPClient := az.PublicIPAddressesClient.(*mockpublicipclient.MockInterface) + mockPIPClient.EXPECT().CreateOrUpdate(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(test.clientErr) + if test.cacheExpectedEmpty { + mockPIPClient.EXPECT().Get(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(network.PublicIPAddress{}, nil) + } + + err := az.CreateOrUpdatePIP(&v1.Service{}, az.ResourceGroup, network.PublicIPAddress{Name: to.StringPtr("nic")}) + assert.EqualError(t, test.expectedErr, err.Error()) + + cachedPIP, err := az.pipCache.Get(az.getPIPCacheKey(az.ResourceGroup, "nic"), cache.CacheReadTypeDefault) + assert.NoError(t, err) + if test.cacheExpectedEmpty { + assert.Empty(t, cachedPIP) + } else { + assert.NotEmpty(t, cachedPIP) + } + } } func TestCreateOrUpdateInterface(t *testing.T) { diff --git a/pkg/provider/azure_fakes.go b/pkg/provider/azure_fakes.go index abe343f43a..d6e1d56963 100644 --- a/pkg/provider/azure_fakes.go +++ b/pkg/provider/azure_fakes.go @@ -108,6 +108,7 @@ func GetTestCloud(ctrl *gomock.Controller) (az *Cloud) { az.lbCache, _ = az.newLBCache() az.nsgCache, _ = az.newNSGCache() az.rtCache, _ = az.newRouteTableCache() + az.pipCache, _ = az.newPIPCache() az.LoadBalancerBackendPool = NewMockBackendPool(ctrl) _ = initDiskControllers(az) diff --git a/pkg/provider/azure_loadbalancer.go b/pkg/provider/azure_loadbalancer.go index 3cbb675345..84b6886c6e 100644 --- a/pkg/provider/azure_loadbalancer.go +++ b/pkg/provider/azure_loadbalancer.go @@ -55,7 +55,7 @@ func (az *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, servic return false } pipResourceGroup := az.getPublicIPAddressResourceGroup(service) - _, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName) + _, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault) if err != nil { return false } @@ -847,7 +847,7 @@ func (az *Cloud) getServiceLoadBalancerStatus(service *v1.Service, lb *network.L if err != nil { return nil, nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress Name from ID(%s)", serviceName, *lb.Name, *pipID) } - pip, existsPip, err := az.getPublicIPAddress(az.getPublicIPAddressResourceGroup(service), pipName) + pip, existsPip, err := az.getPublicIPAddress(az.getPublicIPAddressResourceGroup(service), pipName, azcache.CacheReadTypeDefault) if err != nil { return nil, nil, err } @@ -977,7 +977,7 @@ func (az *Cloud) findServiceIPAddress(ctx context.Context, clusterName string, s func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domainNameLabel, clusterName string, shouldPIPExisted, foundDNSLabelAnnotation bool) (*network.PublicIPAddress, error) { pipResourceGroup := az.getPublicIPAddressResourceGroup(service) - pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName) + pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault) if err != nil { return nil, err } @@ -1363,7 +1363,7 @@ func (az *Cloud) isFrontendIPChanged(clusterName string, config network.Frontend return false, err } pipResourceGroup := az.getPublicIPAddressResourceGroup(service) - pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName) + pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault) if err != nil { return false, err } diff --git a/pkg/provider/azure_ratelimit_test.go b/pkg/provider/azure_ratelimit_test.go index 3395cb2486..f77aa25adb 100644 --- a/pkg/provider/azure_ratelimit_test.go +++ b/pkg/provider/azure_ratelimit_test.go @@ -62,6 +62,7 @@ var ( "loadBalancerCacheTTLInSeconds": 100, "nsgCacheTTLInSeconds": 100, "routeTableCacheTTLInSeconds": 100, + "publicIPCacheTTLInSeconds": 100, "location": "location", "maximumLoadBalancerRuleCount": 1, "primaryAvailabilitySetName": "primaryAvailabilitySetName", @@ -135,6 +136,7 @@ func TestParseConfig(t *testing.T) { LoadBalancerCacheTTLInSeconds: 100, NsgCacheTTLInSeconds: 100, RouteTableCacheTTLInSeconds: 100, + PublicIPCacheTTLInSeconds: 100, Location: "location", MaximumLoadBalancerRuleCount: 1, PrimaryAvailabilitySetName: "primaryAvailabilitySetName", diff --git a/pkg/provider/azure_standard.go b/pkg/provider/azure_standard.go index 08ca043feb..6a39d6b9a1 100644 --- a/pkg/provider/azure_standard.go +++ b/pkg/provider/azure_standard.go @@ -653,7 +653,7 @@ func (as *availabilitySet) GetIPByNodeName(name string) (string, string, error) if err != nil { return "", "", fmt.Errorf("failed to publicIP name for node %q with pipID %q", name, pipID) } - pip, existsPip, err := as.getPublicIPAddress(as.ResourceGroup, pipName) + pip, existsPip, err := as.getPublicIPAddress(as.ResourceGroup, pipName, azcache.CacheReadTypeDefault) if err != nil { return "", "", err } diff --git a/pkg/provider/azure_test.go b/pkg/provider/azure_test.go index 12a9691626..787bb33aba 100644 --- a/pkg/provider/azure_test.go +++ b/pkg/provider/azure_test.go @@ -2051,6 +2051,7 @@ func TestNewCloudFromJSON(t *testing.T) { "loadBalancerCacheTTLInSeconds": 100, "nsgCacheTTLInSeconds": 100, "routeTableCacheTTLInSeconds": 100, + "publicIPCacheTTLInSeconds": 100, "vmType": "vmss", "disableAvailabilitySetNodes": true }` @@ -2110,6 +2111,7 @@ vmCacheTTLInSeconds: 100 loadBalancerCacheTTLInSeconds: 100 nsgCacheTTLInSeconds: 100 routeTableCacheTTLInSeconds: 100 +publicIPCacheTTLInSeconds: 100 vmType: vmss disableAvailabilitySetNodes: true ` @@ -2209,6 +2211,9 @@ func validateConfig(t *testing.T, config string) { //nolint if azureCloud.RouteTableCacheTTLInSeconds != 100 { t.Errorf("got incorrect value for routeTableCacheTTLInSeconds") } + if azureCloud.PublicIPCacheTTLInSeconds != 100 { + t.Errorf("got incorrect value for publicIPCacheTTLInSeconds") + } if azureCloud.VMType != consts.VMTypeVMSS { t.Errorf("got incorrect value for vmType") } diff --git a/pkg/provider/azure_wrap.go b/pkg/provider/azure_wrap.go index 44a07ee419..ad73f398dd 100644 --- a/pkg/provider/azure_wrap.go +++ b/pkg/provider/azure_wrap.go @@ -41,6 +41,7 @@ var ( loadBalancerCacheTTLDefaultInSeconds = 120 nsgCacheTTLDefaultInSeconds = 120 routeTableCacheTTLDefaultInSeconds = 120 + publicIPCacheTTLDefaultInSeconds = 120 azureNodeProviderIDRE = regexp.MustCompile(`^azure:///subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/(?:.*)`) azureResourceGroupNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/(?:.*)`) @@ -96,26 +97,27 @@ func (az *Cloud) getRouteTable(crt azcache.AzureCacheReadType) (routeTable netwo return *(cachedRt.(*network.RouteTable)), true, nil } -func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (network.PublicIPAddress, bool, error) { +func (az *Cloud) getPIPCacheKey(pipResourceGroup string, pipName string) string { resourceGroup := az.ResourceGroup if pipResourceGroup != "" { resourceGroup = pipResourceGroup } + return fmt.Sprintf("%s%s%s", resourceGroup, consts.PIPCacheKeySeparator, pipName) +} - ctx, cancel := getContextWithCancel() - defer cancel() - pip, err := az.PublicIPAddressesClient.Get(ctx, resourceGroup, pipName, "") - exists, rerr := checkResourceExistsFromError(err) - if rerr != nil { - return pip, false, rerr.Error() +func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string, crt azcache.AzureCacheReadType) (network.PublicIPAddress, bool, error) { + pip := network.PublicIPAddress{} + cacheKey := az.getPIPCacheKey(pipResourceGroup, pipName) + cachedPIP, err := az.pipCache.Get(cacheKey, crt) + if err != nil { + return pip, false, err } - if !exists { - klog.V(2).Infof("Public IP %q not found", pipName) + if cachedPIP == nil { return pip, false, nil } - return pip, exists, nil + return *(cachedPIP.(*network.PublicIPAddress)), true, nil } func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (network.Subnet, bool, error) { @@ -288,6 +290,37 @@ func (az *Cloud) newRouteTableCache() (*azcache.TimedCache, error) { return azcache.NewTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter) } +func (az *Cloud) newPIPCache() (*azcache.TimedCache, error) { + getter := func(key string) (interface{}, error) { + ctx, cancel := getContextWithCancel() + defer cancel() + + parsedKey := strings.Split(strings.TrimSpace(key), consts.PIPCacheKeySeparator) + if len(parsedKey) != 2 { + return nil, fmt.Errorf("failed to parse public ip rg and name from cache key %q", key) + } + pipResourceGroup, pipName := strings.TrimSpace(parsedKey[0]), strings.TrimSpace(parsedKey[1]) + + pip, err := az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, pipName, "") + exists, rerr := checkResourceExistsFromError(err) + if rerr != nil { + return nil, rerr.Error() + } + + if !exists { + klog.V(2).Infof("Public IP %q in rg %q not found", pipName, pipResourceGroup) + return nil, nil + } + + return &pip, nil + } + + if az.PublicIPCacheTTLInSeconds == 0 { + az.PublicIPCacheTTLInSeconds = publicIPCacheTTLDefaultInSeconds + } + return azcache.NewTimedcache(time.Duration(az.PublicIPCacheTTLInSeconds)*time.Second, getter) +} + func (az *Cloud) useStandardLoadBalancer() bool { return strings.EqualFold(az.LoadBalancerSku, consts.LoadBalancerSkuStandard) }