diff --git a/core/outlier/retryer.go b/core/outlier/retryer.go index f6fb6ca6..bfb49573 100644 --- a/core/outlier/retryer.go +++ b/core/outlier/retryer.go @@ -130,6 +130,7 @@ func (r *Retryer) onDisconnected(node string) { count = r.maxAttempts } r.mtx.Unlock() + // Fix bugs: When multiple active checks still do not recover, it is necessary to delete node from r.counts. time.AfterFunc(r.interval*time.Duration(count), func() { r.connectNode(node) }) diff --git a/core/outlier/rule_manager.go b/core/outlier/rule_manager.go index d64d6e45..9cc42072 100644 --- a/core/outlier/rule_manager.go +++ b/core/outlier/rule_manager.go @@ -33,8 +33,9 @@ var ( // resource name ---> address ---> circuitbreaker nodeBreakers = make(map[string]map[string]circuitbreaker.CircuitBreaker) // resource name ---> outlier ejection rule - currentRules = make(map[string]*Rule) - updateMux = new(sync.RWMutex) + currentRules = make(map[string]*Rule) + updateMux = new(sync.RWMutex) + updateRuleMux = new(sync.Mutex) ) func getNodeBreakersOfResource(resource string) map[string]circuitbreaker.CircuitBreaker { @@ -131,6 +132,8 @@ func LoadRules(rules []*Rule) (bool, error) { for _, rule := range rules { rulesMap[rule.Resource] = rule } + updateRuleMux.Lock() + defer updateRuleMux.Unlock() isEqual := reflect.DeepEqual(currentRules, rulesMap) if isEqual { logging.Info("[Outlier] Load rules is the same with current rules, so ignore load operation.") @@ -140,6 +143,79 @@ func LoadRules(rules []*Rule) (bool, error) { return true, err } +// LoadRuleOfResource loads the given resource's outlier ejection rule to the rule manager, while previous resource's rule will be replaced. +// the first returned value indicates whether do real load operation, if the rule is the same with previous resource's rule, return false +func LoadRuleOfResource(res string, rule *Rule) (bool, error) { + if len(res) == 0 { + return false, errors.New("empty resource") + } + updateRuleMux.Lock() + defer updateRuleMux.Unlock() + // clear resource rule + if rule == nil { + delete(currentRules, res) + updateMux.Lock() + delete(nodeBreakers, res) + delete(breakerRules, res) + delete(outlierRules, res) + updateMux.Unlock() + logging.Info("[Outlier] clear resource level rule", "resource", res) + return true, nil + } + // load resource level rule + isEqual := reflect.DeepEqual(currentRules[res], rule) + if isEqual { + logging.Info("[Outlier] Load resource level rule is the same with current resource level rule, so ignore load operation.") + return false, nil + } + err := onResourceRuleUpdate(res, rule) + return true, err +} + +func onResourceRuleUpdate(res string, rule *Rule) (err error) { + defer func() { + if r := recover(); r != nil { + var ok bool + err, ok = r.(error) + if !ok { + err = fmt.Errorf("%v", r) + } + } + }() + + circuitRule := rule.Rule + if err = IsValidRule(rule); err != nil { + logging.Warn("[Outlier onResourceRuleUpdate] Ignoring invalid outlier ejection rule", "rule", rule, "err", err.Error()) + return + } + if err = circuitbreaker.IsValidRule(circuitRule); err != nil { + logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error()) + return + } + + start := util.CurrentTimeNano() + breakers := getNodeBreakersOfResource(res) + newBreakers := make(map[string]circuitbreaker.CircuitBreaker) + for address, breaker := range breakers { + newCbsOfRes := circuitbreaker.BuildResourceCircuitBreaker(res, + []*circuitbreaker.Rule{circuitRule}, []circuitbreaker.CircuitBreaker{breaker}) + if len(newCbsOfRes) > 0 { + newBreakers[address] = newCbsOfRes[0] + } + } + + updateMux.Lock() + outlierRules[res] = rule + breakerRules[res] = circuitRule + nodeBreakers[res] = newBreakers + updateMux.Unlock() + currentRules[res] = rule + + logging.Debug("[Outlier onResourceRuleUpdate] Time statistics(ns) for updating outlier ejection rule", "timeCost", util.CurrentTimeNano()-start) + logging.Info("[Outlier] load resource level rule", "resource", res, "rule", rule) + return nil +} + // onRuleUpdate is concurrent safe to update outlier ejection rules func onRuleUpdate(rulesMap map[string]*Rule) (err error) { defer func() { @@ -157,9 +233,11 @@ func onRuleUpdate(rulesMap map[string]*Rule) (err error) { validRulesMap := make(map[string]*Rule, len(rulesMap)) for resource, rule := range rulesMap { circuitRule := rule.Rule - err := IsValidRule(rule) - err = circuitbreaker.IsValidRule(circuitRule) - if err != nil { + if err = IsValidRule(rule); err != nil { + logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error()) + continue + } + if err = circuitbreaker.IsValidRule(circuitRule); err != nil { logging.Warn("[Outlier onRuleUpdate] Ignoring invalid rule when loading new rules", "rule", rule, "err", err.Error()) continue } @@ -178,6 +256,12 @@ func onRuleUpdate(rulesMap map[string]*Rule) (err error) { return nil } +// ClearRuleOfResource clears resource level rule in outlier ejection module. +func ClearRuleOfResource(res string) error { + _, err := LoadRuleOfResource(res, nil) + return err +} + func IsValidRule(r *Rule) error { if r == nil { return errors.New("nil Rule") diff --git a/core/outlier/rule_manager_test.go b/core/outlier/rule_manager_test.go new file mode 100644 index 00000000..48cdb183 --- /dev/null +++ b/core/outlier/rule_manager_test.go @@ -0,0 +1,232 @@ +// Copyright 1999-2020 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package outlier + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/alibaba/sentinel-golang/core/circuitbreaker" +) + +func clearData() { + // resource name ---> outlier ejection rule + outlierRules = make(map[string]*Rule) + // resource name ---> circuitbreaker rule + breakerRules = make(map[string]*circuitbreaker.Rule) + // resource name ---> address ---> circuitbreaker + nodeBreakers = make(map[string]map[string]circuitbreaker.CircuitBreaker) + // resource name ---> outlier ejection rule + currentRules = make(map[string]*Rule) +} + +func Test_onRuleUpdateInvalid(t *testing.T) { + r1 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.5, // MaxEjectionPercent should be in the range [0.0, 1.0] + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + resRulesMap := make(map[string]*Rule) + resRulesMap[r1.Resource] = r1 + err := onRuleUpdate(resRulesMap) + assert.Nil(t, err) + assert.Equal(t, 0, len(GetRules())) + clearData() +} + +func TestGetRules(t *testing.T) { + r1 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + _, _ = LoadRules([]*Rule{r1}) + rules := GetRules() + assert.True(t, len(rules) == 1 && rules[0].Resource == r1.Resource && rules[0].Strategy == r1.Strategy) + clearData() +} + +func TestGetNodeBreakersOfResource(t *testing.T) { + r1 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + _, _ = LoadRules([]*Rule{r1}) + addNodeBreakerOfResource(r1.Resource, "node0") + cbs := getNodeBreakersOfResource(r1.Resource) + assert.True(t, len(cbs) == 1 && cbs["node0"].BoundRule() == r1.Rule) + clearData() +} + +func TestLoadRules(t *testing.T) { + r1 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + _, err := LoadRules([]*Rule{r1}) + assert.Nil(t, err) + ok, err := LoadRules([]*Rule{r1}) + assert.Nil(t, err) + assert.False(t, ok) + clearData() +} + +func getTestRules() []*Rule { + r1 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.SlowRequestRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + r2 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "example.helloworld", + Strategy: circuitbreaker.ErrorRatio, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 1.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + r3 := &Rule{ + Rule: &circuitbreaker.Rule{ + Resource: "test.resource", + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: 3000, + MinRequestAmount: 1, + StatIntervalMs: 1000, + Threshold: 10.0, + }, + EnableActiveRecovery: true, + MaxEjectionPercent: 1.0, + RecoveryIntervalMs: 2000, + MaxRecoveryAttempts: 5, + } + return []*Rule{r1, r2, r3} +} + +func TestLoadRuleOfResource(t *testing.T) { + rules := getTestRules() + r1, r2, _ := rules[0], rules[1], rules[2] + succ, err := LoadRules(rules) + assert.Equal(t, 2, len(breakerRules)) + assert.True(t, succ && err == nil) + + t.Run("LoadRuleOfResource_empty_resource", func(t *testing.T) { + succ, err = LoadRuleOfResource("", r1) + assert.True(t, !succ && err != nil) + }) + + t.Run("LoadRuleOfResource_cache_hit", func(t *testing.T) { + assert.Equal(t, r2, getOutlierRuleOfResource("example.helloworld")) + succ, err = LoadRuleOfResource("example.helloworld", r1) + assert.True(t, succ && err == nil) + }) + + t.Run("LoadRuleOfResource_clear", func(t *testing.T) { + succ, err = LoadRuleOfResource("example.helloworld", nil) + assert.Equal(t, 1, len(breakerRules)) + assert.True(t, succ && err == nil) + assert.True(t, breakerRules["example.helloworld"] == nil && currentRules["example.helloworld"] == nil) + assert.True(t, breakerRules["test.resource"] != nil && currentRules["test.resource"] != nil) + }) + clearData() +} + +func Test_onResourceRuleUpdate(t *testing.T) { + rules := getTestRules() + r1 := rules[0] + succ, err := LoadRules(rules) + addNodeBreakerOfResource(r1.Resource, "node0") + assert.True(t, succ && err == nil) + + t.Run("Test_onResourceRuleUpdate_normal", func(t *testing.T) { + r11 := r1 + r11.Threshold = 0.5 + assert.Nil(t, onResourceRuleUpdate(r1.Resource, r11)) + assert.Equal(t, getOutlierRuleOfResource(r1.Resource), r11) + assert.Equal(t, 1, len(nodeBreakers[r1.Resource])) + breakers := getNodeBreakersOfResource(r1.Resource) + assert.Equal(t, breakers["node0"].BoundRule(), r11.Rule) + clearData() + }) +} + +func TestClearRuleOfResource(t *testing.T) { + rules := getTestRules() + r1 := rules[0] + succ, err := LoadRules(rules) + addNodeBreakerOfResource(r1.Resource, "node0") + assert.True(t, succ && err == nil) + + t.Run("TestClearRuleOfResource_normal", func(t *testing.T) { + assert.Equal(t, 1, len(nodeBreakers[r1.Resource])) + assert.Nil(t, ClearRuleOfResource(r1.Resource)) + assert.Equal(t, 1, len(breakerRules)) + assert.Equal(t, 0, len(nodeBreakers[r1.Resource])) + assert.Equal(t, 1, len(currentRules)) + clearData() + }) +}