Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#1569 from k8s-infra-cherrypick-rob…
Browse files Browse the repository at this point in the history
…ot/cherry-pick-1531-to-release-1.23

[release-1.23] Add cache for public IP
  • Loading branch information
k8s-ci-robot authored Apr 26, 2022
2 parents 9f5f265 + 4feceb7 commit 54c0e9b
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 28 deletions.
3 changes: 3 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ const (
// LoadBalancerBackendPoolConfigurationTypePODIP is the lb backend pool config type pod ip
// TODO (nilo19): support pod IP in the future
LoadBalancerBackendPoolConfigurationTypePODIP = "podIP"

// To get pip, we need both resource group name and pip name, key in cache has format: pip_rg:pip_name
PIPCacheKeySeparator = ":"
)

// error messages
Expand Down
8 changes: 8 additions & 0 deletions pkg/provider/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ type Config struct {
RouteTableCacheTTLInSeconds int `json:"routeTableCacheTTLInSeconds,omitempty" yaml:"routeTableCacheTTLInSeconds,omitempty"`
// AvailabilitySetsCacheTTLInSeconds sets the cache TTL for VMAS
AvailabilitySetsCacheTTLInSeconds int `json:"availabilitySetsCacheTTLInSeconds,omitempty" yaml:"availabilitySetsCacheTTLInSeconds,omitempty"`
// PublicIPCacheTTLInSeconds sets the cache TTL for public ip
PublicIPCacheTTLInSeconds int `json:"publicIPCacheTTLInSeconds,omitempty" yaml:"publicIPCacheTTLInSeconds,omitempty"`
// RouteUpdateWaitingInSeconds is the delay time for waiting route updates to take effect. This waiting delay is added
// because the routes are not taken effect when the async route updating operation returns success. Default is 30 seconds.
RouteUpdateWaitingInSeconds int `json:"routeUpdateWaitingInSeconds,omitempty" yaml:"routeUpdateWaitingInSeconds,omitempty"`
Expand Down Expand Up @@ -336,6 +338,7 @@ type Cloud struct {
lbCache *azcache.TimedCache
nsgCache *azcache.TimedCache
rtCache *azcache.TimedCache
pipCache *azcache.TimedCache

*ManagedDiskController
*controllerCommon
Expand Down Expand Up @@ -667,6 +670,11 @@ func (az *Cloud) initCaches() (err error) {
return err
}

az.pipCache, err = az.newPIPCache()
if err != nil {
return err
}

return nil
}

Expand Down
34 changes: 26 additions & 8 deletions pkg/provider/azure_backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
}
pipRG, pipName := matches[1], matches[2]
klog.V(3).Infof("The public IP %s referenced by load balancer %s is not in Succeeded provisioning state, will try to update it", pipName, to.String(lb.Name))
pip, _, err := az.getPublicIPAddress(pipRG, pipName)
pip, _, err := az.getPublicIPAddress(pipRG, pipName, azcache.CacheReadTypeDefault)
if err != nil {
klog.Errorf("Failed to get the public IP %s in resource group %s: %v", pipName, pipRG, err)
return rerr.Error()
Expand Down Expand Up @@ -411,17 +411,33 @@ func (az *Cloud) CreateOrUpdatePIP(service *v1.Service, pipResourceGroup string,

rerr := az.PublicIPAddressesClient.CreateOrUpdate(ctx, pipResourceGroup, to.String(pip.Name), pip)
klog.V(10).Infof("PublicIPAddressesClient.CreateOrUpdate(%s, %s): end", pipResourceGroup, to.String(pip.Name))
if rerr != nil {
pipJSON, _ := json.Marshal(pip)
klog.Warningf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s, PublicIP request: %s", pipResourceGroup, to.String(pip.Name), rerr.Error().Error(), string(pipJSON))
az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error())
return rerr.Error()
if rerr == nil {
// Invalidate the cache right after updating
_ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name)))
return nil
}

return nil
pipJSON, _ := json.Marshal(pip)
klog.Warningf("PublicIPAddressesClient.CreateOrUpdate(%s, %s) failed: %s, PublicIP request: %s", pipResourceGroup, to.String(pip.Name), rerr.Error().Error(), string(pipJSON))
az.Event(service, v1.EventTypeWarning, "CreateOrUpdatePublicIPAddress", rerr.Error().Error())

// Invalidate the cache because ETAG precondition mismatch.
if rerr.HTTPStatusCode == http.StatusPreconditionFailed {
klog.V(3).Infof("PublicIP cache for (%s, %s) is cleanup because of http.StatusPreconditionFailed", pipResourceGroup, to.String(pip.Name))
_ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name)))
}

retryErrorMessage := rerr.Error().Error()
// Invalidate the cache because another new operation has canceled the current request.
if strings.Contains(strings.ToLower(retryErrorMessage), consts.OperationCanceledErrorMessage) {
klog.V(3).Infof("PublicIP cache for (%s, %s) is cleanup because CreateOrUpdate is canceled by another operation", pipResourceGroup, to.String(pip.Name))
_ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, to.String(pip.Name)))
}

return rerr.Error()
}

// CreateOrUpdateInterface invokes az.PublicIPAddressesClient.CreateOrUpdate with exponential backoff retry
// CreateOrUpdateInterface invokes az.InterfacesClient.CreateOrUpdate with exponential backoff retry
func (az *Cloud) CreateOrUpdateInterface(service *v1.Service, nic network.Interface) error {
ctx, cancel := getContextWithCancel()
defer cancel()
Expand Down Expand Up @@ -454,6 +470,8 @@ func (az *Cloud) DeletePublicIP(service *v1.Service, pipResourceGroup string, pi
return rerr.Error()
}

// Invalidate the cache right after deleting
_ = az.pipCache.Delete(az.getPIPCacheKey(pipResourceGroup, pipName))
return nil
}

Expand Down
52 changes: 47 additions & 5 deletions pkg/provider/azure_backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func TestCreateOrUpdateLB(t *testing.T) {
shouldBeEmpty, err := az.lbCache.Get("lb", cache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.Empty(t, shouldBeEmpty)

// public ip cache should be populated since there's GetPIP
shouldNotBeEmpty, err := az.pipCache.Get(az.getPIPCacheKey(az.ResourceGroup, "pip"), cache.CacheReadTypeDefault)
assert.NoError(t, err)
assert.NotEmpty(t, shouldNotBeEmpty)
}
}

Expand Down Expand Up @@ -377,12 +382,49 @@ func TestCreateOrUpdatePIP(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

az := GetTestCloud(ctrl)
mockPIPClient := az.PublicIPAddressesClient.(*mockpublicipclient.MockInterface)
mockPIPClient.EXPECT().CreateOrUpdate(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(&retry.Error{HTTPStatusCode: http.StatusInternalServerError})
tests := []struct {
clientErr *retry.Error
expectedErr error
cacheExpectedEmpty bool
}{
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusPreconditionFailed},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 412, RawError: %w", error(nil)),
cacheExpectedEmpty: true,
},
{
clientErr: &retry.Error{RawError: fmt.Errorf(consts.OperationCanceledErrorMessage)},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: %w", fmt.Errorf("canceledandsupersededduetoanotheroperation")),
cacheExpectedEmpty: true,
},
{
clientErr: &retry.Error{HTTPStatusCode: http.StatusInternalServerError},
expectedErr: fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)),
cacheExpectedEmpty: false,
},
}

err := az.CreateOrUpdatePIP(&v1.Service{}, az.ResourceGroup, network.PublicIPAddress{Name: to.StringPtr("nic")})
assert.EqualError(t, fmt.Errorf("Retriable: false, RetryAfter: 0s, HTTPStatusCode: 500, RawError: %w", error(nil)), err.Error())
for _, test := range tests {
az := GetTestCloud(ctrl)
cacheKey := az.getPIPCacheKey(az.ResourceGroup, "nic")
az.pipCache.Set(cacheKey, "test")
mockPIPClient := az.PublicIPAddressesClient.(*mockpublicipclient.MockInterface)
mockPIPClient.EXPECT().CreateOrUpdate(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(test.clientErr)
if test.cacheExpectedEmpty {
mockPIPClient.EXPECT().Get(gomock.Any(), az.ResourceGroup, "nic", gomock.Any()).Return(network.PublicIPAddress{}, nil)
}

err := az.CreateOrUpdatePIP(&v1.Service{}, az.ResourceGroup, network.PublicIPAddress{Name: to.StringPtr("nic")})
assert.EqualError(t, test.expectedErr, err.Error())

cachedPIP, err := az.pipCache.Get(az.getPIPCacheKey(az.ResourceGroup, "nic"), cache.CacheReadTypeDefault)
assert.NoError(t, err)
if test.cacheExpectedEmpty {
assert.Empty(t, cachedPIP)
} else {
assert.NotEmpty(t, cachedPIP)
}
}
}

func TestCreateOrUpdateInterface(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/provider/azure_fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func GetTestCloud(ctrl *gomock.Controller) (az *Cloud) {
az.lbCache, _ = az.newLBCache()
az.nsgCache, _ = az.newNSGCache()
az.rtCache, _ = az.newRouteTableCache()
az.pipCache, _ = az.newPIPCache()
az.LoadBalancerBackendPool = NewMockBackendPool(ctrl)

_ = initDiskControllers(az)
Expand Down
8 changes: 4 additions & 4 deletions pkg/provider/azure_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (az *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, servic
return false
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
_, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName)
_, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault)
if err != nil {
return false
}
Expand Down Expand Up @@ -847,7 +847,7 @@ func (az *Cloud) getServiceLoadBalancerStatus(service *v1.Service, lb *network.L
if err != nil {
return nil, nil, fmt.Errorf("get(%s): lb(%s) - failed to get LB PublicIPAddress Name from ID(%s)", serviceName, *lb.Name, *pipID)
}
pip, existsPip, err := az.getPublicIPAddress(az.getPublicIPAddressResourceGroup(service), pipName)
pip, existsPip, err := az.getPublicIPAddress(az.getPublicIPAddressResourceGroup(service), pipName, azcache.CacheReadTypeDefault)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -977,7 +977,7 @@ func (az *Cloud) findServiceIPAddress(ctx context.Context, clusterName string, s

func (az *Cloud) ensurePublicIPExists(service *v1.Service, pipName string, domainNameLabel, clusterName string, shouldPIPExisted, foundDNSLabelAnnotation bool) (*network.PublicIPAddress, error) {
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1363,7 +1363,7 @@ func (az *Cloud) isFrontendIPChanged(clusterName string, config network.Frontend
return false, err
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName)
pip, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName, azcache.CacheReadTypeDefault)
if err != nil {
return false, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/provider/azure_ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
"loadBalancerCacheTTLInSeconds": 100,
"nsgCacheTTLInSeconds": 100,
"routeTableCacheTTLInSeconds": 100,
"publicIPCacheTTLInSeconds": 100,
"location": "location",
"maximumLoadBalancerRuleCount": 1,
"primaryAvailabilitySetName": "primaryAvailabilitySetName",
Expand Down Expand Up @@ -135,6 +136,7 @@ func TestParseConfig(t *testing.T) {
LoadBalancerCacheTTLInSeconds: 100,
NsgCacheTTLInSeconds: 100,
RouteTableCacheTTLInSeconds: 100,
PublicIPCacheTTLInSeconds: 100,
Location: "location",
MaximumLoadBalancerRuleCount: 1,
PrimaryAvailabilitySetName: "primaryAvailabilitySetName",
Expand Down
2 changes: 1 addition & 1 deletion pkg/provider/azure_standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (as *availabilitySet) GetIPByNodeName(name string) (string, string, error)
if err != nil {
return "", "", fmt.Errorf("failed to publicIP name for node %q with pipID %q", name, pipID)
}
pip, existsPip, err := as.getPublicIPAddress(as.ResourceGroup, pipName)
pip, existsPip, err := as.getPublicIPAddress(as.ResourceGroup, pipName, azcache.CacheReadTypeDefault)
if err != nil {
return "", "", err
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/provider/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,7 @@ func TestNewCloudFromJSON(t *testing.T) {
"loadBalancerCacheTTLInSeconds": 100,
"nsgCacheTTLInSeconds": 100,
"routeTableCacheTTLInSeconds": 100,
"publicIPCacheTTLInSeconds": 100,
"vmType": "vmss",
"disableAvailabilitySetNodes": true
}`
Expand Down Expand Up @@ -2110,6 +2111,7 @@ vmCacheTTLInSeconds: 100
loadBalancerCacheTTLInSeconds: 100
nsgCacheTTLInSeconds: 100
routeTableCacheTTLInSeconds: 100
publicIPCacheTTLInSeconds: 100
vmType: vmss
disableAvailabilitySetNodes: true
`
Expand Down Expand Up @@ -2209,6 +2211,9 @@ func validateConfig(t *testing.T, config string) { //nolint
if azureCloud.RouteTableCacheTTLInSeconds != 100 {
t.Errorf("got incorrect value for routeTableCacheTTLInSeconds")
}
if azureCloud.PublicIPCacheTTLInSeconds != 100 {
t.Errorf("got incorrect value for publicIPCacheTTLInSeconds")
}
if azureCloud.VMType != consts.VMTypeVMSS {
t.Errorf("got incorrect value for vmType")
}
Expand Down
53 changes: 43 additions & 10 deletions pkg/provider/azure_wrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
loadBalancerCacheTTLDefaultInSeconds = 120
nsgCacheTTLDefaultInSeconds = 120
routeTableCacheTTLDefaultInSeconds = 120
publicIPCacheTTLDefaultInSeconds = 120

azureNodeProviderIDRE = regexp.MustCompile(`^azure:///subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/(?:.*)`)
azureResourceGroupNameRE = regexp.MustCompile(`.*/subscriptions/(?:.*)/resourceGroups/(.+)/providers/(?:.*)`)
Expand Down Expand Up @@ -96,26 +97,27 @@ func (az *Cloud) getRouteTable(crt azcache.AzureCacheReadType) (routeTable netwo
return *(cachedRt.(*network.RouteTable)), true, nil
}

func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string) (network.PublicIPAddress, bool, error) {
func (az *Cloud) getPIPCacheKey(pipResourceGroup string, pipName string) string {
resourceGroup := az.ResourceGroup
if pipResourceGroup != "" {
resourceGroup = pipResourceGroup
}
return fmt.Sprintf("%s%s%s", resourceGroup, consts.PIPCacheKeySeparator, pipName)
}

ctx, cancel := getContextWithCancel()
defer cancel()
pip, err := az.PublicIPAddressesClient.Get(ctx, resourceGroup, pipName, "")
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return pip, false, rerr.Error()
func (az *Cloud) getPublicIPAddress(pipResourceGroup string, pipName string, crt azcache.AzureCacheReadType) (network.PublicIPAddress, bool, error) {
pip := network.PublicIPAddress{}
cacheKey := az.getPIPCacheKey(pipResourceGroup, pipName)
cachedPIP, err := az.pipCache.Get(cacheKey, crt)
if err != nil {
return pip, false, err
}

if !exists {
klog.V(2).Infof("Public IP %q not found", pipName)
if cachedPIP == nil {
return pip, false, nil
}

return pip, exists, nil
return *(cachedPIP.(*network.PublicIPAddress)), true, nil
}

func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (network.Subnet, bool, error) {
Expand Down Expand Up @@ -288,6 +290,37 @@ func (az *Cloud) newRouteTableCache() (*azcache.TimedCache, error) {
return azcache.NewTimedcache(time.Duration(az.RouteTableCacheTTLInSeconds)*time.Second, getter)
}

func (az *Cloud) newPIPCache() (*azcache.TimedCache, error) {
getter := func(key string) (interface{}, error) {
ctx, cancel := getContextWithCancel()
defer cancel()

parsedKey := strings.Split(strings.TrimSpace(key), consts.PIPCacheKeySeparator)
if len(parsedKey) != 2 {
return nil, fmt.Errorf("failed to parse public ip rg and name from cache key %q", key)
}
pipResourceGroup, pipName := strings.TrimSpace(parsedKey[0]), strings.TrimSpace(parsedKey[1])

pip, err := az.PublicIPAddressesClient.Get(ctx, pipResourceGroup, pipName, "")
exists, rerr := checkResourceExistsFromError(err)
if rerr != nil {
return nil, rerr.Error()
}

if !exists {
klog.V(2).Infof("Public IP %q in rg %q not found", pipName, pipResourceGroup)
return nil, nil
}

return &pip, nil
}

if az.PublicIPCacheTTLInSeconds == 0 {
az.PublicIPCacheTTLInSeconds = publicIPCacheTTLDefaultInSeconds
}
return azcache.NewTimedcache(time.Duration(az.PublicIPCacheTTLInSeconds)*time.Second, getter)
}

func (az *Cloud) useStandardLoadBalancer() bool {
return strings.EqualFold(az.LoadBalancerSku, consts.LoadBalancerSkuStandard)
}
Expand Down

0 comments on commit 54c0e9b

Please sign in to comment.