diff --git a/awareness.go b/awareness.go index 99f67ef..2d18c61 100644 --- a/awareness.go +++ b/awareness.go @@ -35,8 +35,8 @@ import ( type Awareness struct { sync.RWMutex - // max is the upper threshold for the factor that increase the timeout value - // (the score will be constrained from 0 <= score < max) + // Max is the upper threshold for the factor that increase the timeout value + // (the score will be constrained from 0 <= score < Max) max int // score is the current awareness score. Lower values are healthier and @@ -58,7 +58,7 @@ func (a *Awareness) GetHealthScore() int { } // ApplyDelta with given delta applies it to the score in thread-safe manner -// score must be bound from 0 to max value +// score must be bound from 0 to Max value func (a *Awareness) ApplyDelta(delta int) { a.RLock() diff --git a/awareness_internal_test.go b/awareness_internal_test.go index fd1658b..adda6d1 100644 --- a/awareness_internal_test.go +++ b/awareness_internal_test.go @@ -30,17 +30,17 @@ func TestNewAwareness(t *testing.T) { max int score int }{ - "max-3": { + "Max-3": { input: 3, max: 3, score: 0, }, - "max-5": { + "Max-5": { input: 5, max: 5, score: 0, }, - "max-10": { + "Max-10": { input: 10, max: 10, score: 0, diff --git a/awareness_test.go b/awareness_test.go index 64beb73..9096810 100644 --- a/awareness_test.go +++ b/awareness_test.go @@ -37,7 +37,7 @@ func TestAwareness_ApplyDelta(t *testing.T) { delta: -3, score: 0, }, - "max score": { + "Max score": { max: 5, delta: 10, score: 4, diff --git a/cmd/common/util.go b/cmd/common/util.go new file mode 100644 index 0000000..f979bb0 --- /dev/null +++ b/cmd/common/util.go @@ -0,0 +1,58 @@ +package common + +import ( + "os/user" + "path" + "path/filepath" + "strings" +) + +func RelativeToAbsolutePath(rpath string) (string, error) { + if rpath == "" { + return rpath, nil + } + + absolutePath := "" + + // case ./ ../ + if strings.Contains(rpath, "./") { + abs, err := filepath.Abs(rpath) + if err != nil { + return rpath, err + } + return abs, nil + } + + // case ~/ + if strings.Contains(rpath, "~") { + i := strings.Index(rpath, "~") // 처음 나온 ~만 반환 + + if i > -1 { + pathRemain := rpath[i+1:] + usr, err := user.Current() + if err != nil { + return rpath, err + } + return path.Join(usr.HomeDir, pathRemain), nil + + } else { + return rpath, nil + } + } + + if string(rpath[0]) == "/" { + return rpath, nil + } + + if string(rpath[0]) != "." && string(rpath[0]) != "/" { + currentPath, err := filepath.Abs(".") + if err != nil { + return rpath, err + } + + return path.Join(currentPath, rpath), nil + } + + return absolutePath, nil + +} diff --git a/cmd/common/util_test.go b/cmd/common/util_test.go new file mode 100644 index 0000000..b401390 --- /dev/null +++ b/cmd/common/util_test.go @@ -0,0 +1,86 @@ +package common_test + +import ( + "io/ioutil" + "os" + "os/user" + "path" + "path/filepath" + "testing" + + "github.com/DE-labtory/swim/cmd/common" + "github.com/stretchr/testify/assert" +) + +func TestRelativeToAbsolutePath(t *testing.T) { + + testfile1 := "./util.go" + testabsresult1, err := filepath.Abs(testfile1) + assert.NoError(t, err) + testabs1, err := common.RelativeToAbsolutePath(testfile1) + + assert.NoError(t, err) + assert.Equal(t, testabs1, testabsresult1) + + testfile2 := "../README.md" + testabsresult2, err := filepath.Abs(testfile2) + assert.NoError(t, err) + + testabs2, err := common.RelativeToAbsolutePath(testfile2) + + assert.NoError(t, err) + assert.Equal(t, testabs2, testabsresult2) + + // 남의 홈패스에 뭐가있는지 알길이 없으니 하나 만들었다 지움 + usr, err := user.Current() + assert.NoError(t, err) + + testfile3 := usr.HomeDir + "/test.txt" + + _, err = os.Stat(usr.HomeDir) + if os.IsNotExist(err) { + file, err := os.Create(testfile3) + assert.NoError(t, err) + defer file.Close() + } + + err = ioutil.WriteFile(testfile3, []byte("test"), os.ModePerm) + assert.NoError(t, err) + + testfile4 := "~/test.txt" + + testabs3, err := common.RelativeToAbsolutePath(testfile4) + assert.NoError(t, err) + assert.Equal(t, testfile3, testabs3) + + err = os.Remove(testfile3) + assert.NoError(t, err) +} + +func TestRelativeToAbsolutePath_WhenGivenPathIsAbsolute(t *testing.T) { + sshPath := "/iAmRoot" + + absPath, err := common.RelativeToAbsolutePath(sshPath) + + assert.NoError(t, err) + assert.Equal(t, sshPath, absPath) +} + +func TestRelativeToAbsolutePath_WhenGivenPathWithOnlyName(t *testing.T) { + sshPath := "test-dir" + + absPath, err := common.RelativeToAbsolutePath(sshPath) + currentPath, _ := filepath.Abs(".") + + assert.NoError(t, err) + assert.Equal(t, path.Join(currentPath, sshPath), absPath) +} + +func TestRelativeToAbsolutePath_WhenGivenPathIsEmpty(t *testing.T) { + sshPath := "" + + absPath, err := common.RelativeToAbsolutePath(sshPath) + + assert.Equal(t, nil, err) + assert.Equal(t, "", absPath) +} diff --git a/cmd/subcommands.go b/cmd/subcommands.go index 7f0cba0..5c90e2e 100644 --- a/cmd/subcommands.go +++ b/cmd/subcommands.go @@ -1,6 +1,8 @@ package cmd -import "github.com/urfave/cli" +import ( + "github.com/urfave/cli" +) var startCmd = cli.Command{ Name: "start", diff --git a/cmd/swim.go b/cmd/swim.go index a5bef73..90c3194 100644 --- a/cmd/swim.go +++ b/cmd/swim.go @@ -5,6 +5,9 @@ import ( "os" "time" + "github.com/DE-labtory/swim/cmd/common" + "github.com/DE-labtory/swim/conf" + "github.com/urfave/cli" ) @@ -34,8 +37,13 @@ func main() { app.Commands = append(app.Commands, Cmd()...) app.Before = func(c *cli.Context) error { - // config - // debug + if configPath := c.String("config"); configPath != "" { + absPath, err := common.RelativeToAbsolutePath(configPath) + if err != nil { + return err + } + conf.SetConfigPath(absPath) + } return nil } err := app.Run(os.Args) diff --git a/conf/configuration.go b/conf/configuration.go new file mode 100644 index 0000000..87ecc88 --- /dev/null +++ b/conf/configuration.go @@ -0,0 +1,44 @@ +package conf + +import ( + "fmt" + "os" + "sync" + + "github.com/DE-labtory/swim" + + "github.com/spf13/viper" +) + +var confPath = os.Getenv("GOPATH") + "/src/github.com/DE-labtory/swim/conf/config.yaml" + +type Configuration struct { + SWIMConfig swim.Config + SuspicionConfig swim.SuspicionConfig + MessageEndpointConfig swim.MessageEndpointConfig + Member swim.Member +} + +// config is instance of SWIM configuration +var config = &Configuration{} + +var once = sync.Once{} + +func SetConfigPath(abspath string) { + confPath = abspath +} + +func GetConfiguration() *Configuration { + once.Do(func() { + viper.SetConfigFile(confPath) + if err := viper.ReadInConfig(); err != nil { + panic(fmt.Sprintf("cannot read config from %s", confPath)) + } + err := viper.Unmarshal(&config) + if err != nil { + panic("error in read config") + } + }) + + return config +} diff --git a/conf/configuration_test.go b/conf/configuration_test.go new file mode 100644 index 0000000..ce77602 --- /dev/null +++ b/conf/configuration_test.go @@ -0,0 +1,68 @@ +package conf_test + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/DE-labtory/swim" + + "github.com/DE-labtory/swim/conf" + "github.com/stretchr/testify/assert" +) + +func TestGetConfiguration(t *testing.T) { + path := os.Getenv("GOPATH") + "/src/github.com/DE-labtory/swim/conf" + confFileName := "/config-test.yaml" + defer os.Remove(path + confFileName) + + if _, err := os.Stat(path); os.IsNotExist(err) { + os.Mkdir(path, os.ModePerm) + } + + // please leave this whitespace as space not tab + ConfJson := []byte(` + swimconfig: + maxlocalcount: 2 + maxnsacounter: 8 + t: 3000 + acktimeout: 3000 + k: 2 + bindaddress: 127.0.0.1 + bindport: 50000 + suspicionconfig: + k: 2 + min: 3000 + max: 10000 + messageendpointconfig: + encryptionenabled: false + sendtimeout: 5000 + callbackcollectinterval: 7000 + member: + id: + id: "hello_id" + `) + + err := ioutil.WriteFile(path+confFileName, ConfJson, os.ModePerm) + assert.NoError(t, err) + + conf.SetConfigPath(path + confFileName) + config := conf.GetConfiguration() + assert.Equal(t, config.SWIMConfig.MaxlocalCount, 2) + assert.Equal(t, config.SWIMConfig.MaxNsaCounter, 8) + assert.Equal(t, config.SWIMConfig.T, 3000) + assert.Equal(t, config.SWIMConfig.AckTimeOut, 3000) + assert.Equal(t, config.SWIMConfig.K, 2) + assert.Equal(t, config.SWIMConfig.BindAddress, "127.0.0.1") + assert.Equal(t, config.SWIMConfig.BindPort, 50000) + + assert.Equal(t, config.SuspicionConfig.K, 2) + assert.Equal(t, config.SuspicionConfig.Min.Nanoseconds(), int64(3000)) + assert.Equal(t, config.SuspicionConfig.Max.Nanoseconds(), int64(10000)) + + assert.Equal(t, config.MessageEndpointConfig.EncryptionEnabled, false) + assert.Equal(t, config.MessageEndpointConfig.SendTimeout.Nanoseconds(), int64(5000)) + assert.Equal(t, config.MessageEndpointConfig.CallbackCollectInterval.Nanoseconds(), int64(7000)) + + assert.Equal(t, config.Member.ID, swim.MemberID{ID: "hello_id"}) +} diff --git a/member_map.go b/member_map.go index 209c8ab..c7dce4e 100644 --- a/member_map.go +++ b/member_map.go @@ -54,15 +54,15 @@ func (s Status) toInt() int32 { } type SuspicionConfig struct { - // k is the maximum number of independent confirmation's we'd like to see - // this value is for making timer to drive @min value - k int + // K is the maximum number of independent confirmation's we'd like to see + // this value is for making timer to drive @Min value + K int - // min is the minimum timer value - min time.Duration + // Min is the minimum timer value + Min time.Duration - // max is the maximum timer value - max time.Duration + // Max is the maximum timer value + Max time.Duration } type MemberID struct { @@ -147,7 +147,7 @@ func NewMemberMap(config *SuspicionConfig) *MemberMap { } } -// Select K random member (length of returning member can be lower than k). +// Select K random member (length of returning member can be lower than K). func (m *MemberMap) SelectKRandomMemberID(k int) []Member { m.lock.Lock() @@ -235,7 +235,7 @@ func (m *MemberMap) suspectWhenDead() (bool, error) { func (m *MemberMap) suspectWhenAlive(member *Member, confirmer string, incarnation uint32) (bool, error) { config := m.suspicionConfig - suspicion, err := NewSuspicion(MemberID{confirmer}, config.k, config.min, config.max, getSuspicionCallback(m, member)) + suspicion, err := NewSuspicion(MemberID{confirmer}, config.K, config.Min, config.Max, getSuspicionCallback(m, member)) if err != nil { return false, ErrCreatingSuspicion } @@ -251,7 +251,7 @@ func (m *MemberMap) suspectWhenSuspect(member *Member, confirmer string, incarna config := m.suspicionConfig if member.Suspicion == nil { - suspicion, err := NewSuspicion(MemberID{confirmer}, config.k, config.min, config.max, getSuspicionCallback(m, member)) + suspicion, err := NewSuspicion(MemberID{confirmer}, config.K, config.Min, config.Max, getSuspicionCallback(m, member)) if err != nil { return false, ErrCreatingSuspicion } diff --git a/member_map_internal_test.go b/member_map_internal_test.go index 7a7f680..c2de32d 100644 --- a/member_map_internal_test.go +++ b/member_map_internal_test.go @@ -242,7 +242,7 @@ func TestMemberMap_SelectKRandomMember(t *testing.T) { assert.True(t, checkExist(rMembers, members[i])) } - // case 2: when k is larger then length of members + // case 2: when K is larger then length of members assert.Equal(t, len(m.SelectKRandomMemberID(5)), 3) } @@ -410,9 +410,9 @@ func TestMemberMap_Suspect_When_Member_Suspect_Without_Suspicion(t *testing.T) { } m := NewMemberMap(&SuspicionConfig{ - k: 1000, - min: time.Hour, - max: time.Hour * 8, + K: 1000, + Min: time.Hour, + Max: time.Hour * 8, }) m.members[MemberID{ID: "1"}] = member1 diff --git a/pbkstore.go b/pbkstore.go index 7e46ea0..0ca4419 100644 --- a/pbkstore.go +++ b/pbkstore.go @@ -52,7 +52,7 @@ type PriorityMbrStatsMsgStore struct { lock sync.RWMutex } -// macLocalCount is the max priority value +// macLocalCount is the Max priority value func NewPriorityMbrStatsMsgStore(maxLocalCount int) *PriorityMbrStatsMsgStore { return &PriorityMbrStatsMsgStore{ q: make(PriorityQueue, 0), @@ -70,7 +70,7 @@ func (p *PriorityMbrStatsMsgStore) Len() int { } // Initially, set the local count to zero. -// If the queue size is max, delete the data with the highest localcount and insert it. +// If the queue size is Max, delete the data with the highest localcount and insert it. func (p *PriorityMbrStatsMsgStore) Push(msg pb.MbrStatsMsg) { p.lock.Lock() defer p.lock.Unlock() diff --git a/suspicion.go b/suspicion.go index fe7dbd8..2c9e5f7 100644 --- a/suspicion.go +++ b/suspicion.go @@ -38,14 +38,14 @@ type Suspicion struct { // n is the number of independent confirmations we've seen. n int32 - // k is the maximum number of independent confirmation's we'd like to see - // this value is for making timer to drive @min value + // K is the maximum number of independent confirmation's we'd like to see + // this value is for making timer to drive @Min value k int32 - // min is the minimum timer value + // Min is the minimum timer value min time.Duration - // max is the maximum timer value + // Max is the maximum timer value max time.Duration // start captures the timestamp when the suspect began the timer. This value is used @@ -65,9 +65,9 @@ type Suspicion struct { confirmations map[MemberID]struct{} } -// NewSuspicion returns a timer started with the max value, and according to +// NewSuspicion returns a timer started with the Max value, and according to // Lifeguard L2 (Dynamic Suspicion timeout) each unique confirmation will drive the timer -// to min value +// to Min value func NewSuspicion(confirmer MemberID, k int, min time.Duration, max time.Duration, timeoutHandler func()) (*Suspicion, error) { if timeoutHandler == nil { @@ -88,7 +88,7 @@ func NewSuspicion(confirmer MemberID, k int, min time.Duration, max time.Duratio // easy telemetry. s.timeoutHandler = timeoutHandler - // If there aren't any confirmations to be made then take the min + // If there aren't any confirmations to be made then take the Min // time from the start. timeout := max if k < 1 { diff --git a/swim.go b/swim.go index 9e3402c..642c754 100644 --- a/swim.go +++ b/swim.go @@ -19,7 +19,6 @@ package swim import ( "context" "errors" - "fmt" "reflect" "sync" "time" @@ -341,10 +340,10 @@ func (s *SWIM) toDie() bool { // 2. SWIM waits for ack of the member(j) during the ack-timeout (time less than T). // End failure Detector if ack message arrives on ack-timeout. // -// 3. SWIM selects k number of members from the memberMap and sends indirect-ping(request k members to ping the member(j)). +// 3. SWIM selects K number of members from the memberMap and sends indirect-ping(request K members to ping the member(j)). // The nodes (that receive the indirect-ping) ping to the member(j) and ack when they receive ack from the member(j). // -// 4. At the end of T, SWIM checks to see if ack was received from k members, and if there is no message, +// 4. At the end of T, SWIM checks to see if ack was received from K members, and if there is no message, // The member(j) is judged to be failed, so check the member(j) as suspected or delete the member(j) from memberMap. // // ** When performing ping, ack, and indirect-ping in the above procedure, piggybackdata is sent together. ** @@ -390,10 +389,10 @@ func (s *SWIM) startFailureDetector() { // 1. Send ping to the member(j) during the ack-timeout (time less than T). // Return if ack message arrives on ack-timeout. // -// 2. selects k number of members from the memberMap and sends indirect-ping(request k members to ping the member(j)). +// 2. selects K number of members from the memberMap and sends indirect-ping(request K members to ping the member(j)). // The nodes (that receive the indirect-ping) ping to the member(j) and ack when they receive ack from the member(j). // -// 3. At the end of T, SWIM checks to see if ack was received from k members, and if there is no message, +// 3. At the end of T, SWIM checks to see if ack was received from K members, and if there is no message, // The member(j) is judged to be failed, so check the member(j) as suspected or delete the member(j) from memberMap. // func (s *SWIM) probe(member Member, timer *time.Timer) { @@ -452,7 +451,6 @@ func (s *SWIM) probe(member Member, timer *time.Timer) { // otherwise just decrease Awareness score case resp := <-end: if !resp.Ok() { - fmt.Println("not ok") s.awareness.ApplyDelta(1) s.suspect(&member) return @@ -462,11 +460,11 @@ func (s *SWIM) probe(member Member, timer *time.Timer) { } } -// indirectProbe select k-random member from MemberMap, sends +// indirectProbe select K-random member from MemberMap, sends // indirect-ping to them. if one of them sends back Ack message // then indirectProbe success, otherwise failed. -// if one of k-member successfully received ACK message, then cancel -// k-1 member's probe +// if one of K-member successfully received ACK message, then cancel +// K-1 member's probe func (s *SWIM) indirectProbe(target *Member) error { wg := &sync.WaitGroup{} wg.Add(s.config.K) @@ -515,8 +513,8 @@ func (s *SWIM) indirectProbe(target *Member) error { }(m) } - // wait until k-random member sends back response, if response message - // is Ack message, then indirectProbe success because one of k-member + // wait until K-random member sends back response, if response message + // is Ack message, then indirectProbe success because one of K-member // success UDP communication, if Nack message or Invalid message, increase // @unexpectedRespCounter then wait other member's response @@ -543,7 +541,7 @@ func (s *SWIM) indirectProbe(target *Member) error { // ping ping to member with piggyback message after sending ping message // the result can be: // 1. timeout -// in this case, push signal to start indirect-ping request to k random nodes +// in this case, push signal to start indirect-ping request to K random nodes // 2. successfully probed // in the case of successfully probe target node, update member state with // piggyback message sent from target member. @@ -572,7 +570,7 @@ func (s *SWIM) ping(target *Member) error { // indirectPing sends indirect-ping to @member targeting @target member // ** only when @member sends back to local node, push Message to channel // otherwise just return ** -// @ctx is for sending cancel signal from outside, when one of k-member successfully +// @ctx is for sending cancel signal from outside, when one of K-member successfully // received ACK message or when in the exceptional situation func (s *SWIM) indirectPing(mediator, target Member) (pb.Message, error) { stats, err := s.mbrStatsMsgStore.Get() diff --git a/swim_internal_test.go b/swim_internal_test.go index 66395be..eafea6d 100644 --- a/swim_internal_test.go +++ b/swim_internal_test.go @@ -1193,7 +1193,7 @@ func TestSWIM_ping_When_Response_Failed(t *testing.T) { assert.Error(t, err, ErrSendTimeout) } -// test when one of k-members response with other than ACK or NACK +// test when one of K-members response with other than ACK or NACK func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { mIAddr := "127.0.0.1:11184" mJAddr := "127.0.0.1:11183" @@ -1631,9 +1631,9 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { } mm := NewMemberMap(&SuspicionConfig{ - k: 2, - min: time.Hour, - max: time.Hour * 8, + K: 2, + Min: time.Hour, + Max: time.Hour * 8, }) mI := &Member{ @@ -1743,9 +1743,9 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 13164, Status: Alive} mm := NewMemberMap(&SuspicionConfig{ - k: 2, - min: time.Hour, - max: time.Hour * 8, + K: 2, + Min: time.Hour, + Max: time.Hour * 8, }) mm.members[m1Member.ID] = m1Member mm.members[m2Member.ID] = m2Member