From 1b9b94cd2ef350da084ac7a351abd6ad14e756c2 Mon Sep 17 00:00:00 2001 From: zerofruit <14wnrkim@gmail.com> Date: Wed, 12 Dec 2018 16:29:12 +0900 Subject: [PATCH] create TaskRunner stop, toDie func --- member_map_internal_test.go | 7 - swim.go | 175 +++++++++++++-------- swim_internal_test.go | 294 +++++++++++++----------------------- util.go | 27 +++- 4 files changed, 243 insertions(+), 260 deletions(-) diff --git a/member_map_internal_test.go b/member_map_internal_test.go index d1a51b0..7a7f680 100644 --- a/member_map_internal_test.go +++ b/member_map_internal_test.go @@ -424,9 +424,7 @@ func TestMemberMap_Suspect_When_Member_Suspect_Without_Suspicion(t *testing.T) { }, ConfirmerID: "IAMCONFIRMER", } - res, err := m.Suspect(msg1) - assert.Equal(t, res, true) assert.Equal(t, err, nil) assert.Equal(t, member1.Incarnation, msg1.Incarnation) @@ -434,11 +432,6 @@ func TestMemberMap_Suspect_When_Member_Suspect_Without_Suspicion(t *testing.T) { assert.Equal(t, member1.Suspicion.confirmations, map[MemberID]struct{}{ MemberID{ID: "IAMCONFIRMER"}: {}, }) - - // When member already have suspicion not update timestamp - // only update suspicion timeout, in this case member1 have no initial timestamp - // so assert with isZero - assert.True(t, member1.LastStatusChange.IsZero()) } func TestMemberMap_Suspect_When_Dead(t *testing.T) { diff --git a/swim.go b/swim.go index 1827bb5..9e3402c 100644 --- a/swim.go +++ b/swim.go @@ -19,6 +19,8 @@ package swim import ( "context" "errors" + "fmt" + "reflect" "sync" "time" @@ -30,13 +32,27 @@ import ( ) var ErrInvalidMbrStatsMsgType = errors.New("error invalid mbrStatsMsg type") -var ErrAllSentNackMsg = errors.New("error all of m_k members sent back nack message") -var ErrAllSentInvalidMsg = errors.New("error all of m_k members sent back invalid message") +var ErrPingFailed = errors.New("error ping failed") +var ErrIndProbeFailed = errors.New("error when indirect-probe failed") +var ErrInvalidPayloadType = errors.New("invalid indirect-ping response payload type") type ProbeResponse struct { err error } +func (r ProbeResponse) Ok() bool { + return r.err == nil +} + +type IndProbeResponse struct { + err error + msg pb.Message +} + +func (r IndProbeResponse) Ok() bool { + return r.err == nil && reflect.TypeOf(r.msg.Payload) == reflect.TypeOf(&pb.Message_Ack{}) +} + type Config struct { // The maximum number of times the same piggyback data can be queried @@ -75,6 +91,9 @@ type SWIM struct { // Information of this node member *Member + // stopFlag tells whether SWIM is running or not + stopFlag int32 + // FailureDetector quit channel quitFD chan struct{} @@ -122,6 +141,7 @@ func messageEndpointFactory(config *Config, messageEndpointConfig MessageEndpoin // Start SWIM protocol. func (s *SWIM) Start() { + atomic.CompareAndSwapInt32(&s.stopFlag, DIE, AVAILABLE) go s.messageEndpoint.Listen() s.startFailureDetector() } @@ -304,10 +324,16 @@ func (s *SWIM) Gossip(msg []byte) { // Shutdown the running swim. func (s *SWIM) ShutDown() { + atomic.CompareAndSwapInt32(&s.stopFlag, AVAILABLE, DIE) s.messageEndpoint.Shutdown() s.quitFD <- struct{}{} } +// toDie tell whether SWIM is stopped or not +func (s *SWIM) toDie() bool { + return atomic.LoadInt32(&(s.stopFlag)) == DIE +} + // Total Failure Detection is performed for each` T`. (ref: https://github.com/DE-labtory/swim/edit/develop/docs/Docs.md) // // 1. SWIM randomly selects a member(j) in the memberMap and ping to the member(j). @@ -335,7 +361,7 @@ func (s *SWIM) startFailureDetector() { interval := time.Millisecond * time.Duration(s.config.T) T := time.NewTimer(interval) - for { + for !s.toDie() { members := s.memberMap.GetMembers() if len(members) == 0 { <-T.C @@ -381,37 +407,52 @@ func (s *SWIM) probe(member Member, timer *time.Timer) { close(end) }() + // this context and wait group are used to cancel probe procedure + // when probing time is out + wg := sync.WaitGroup{} + wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + go func() { - err := s.ping(&member) - if err == nil { - end <- ProbeResponse{} - return - } - if err != ErrSendTimeout { - iLogger.Errorf(nil, "Error occurred when ping failed [%s]", err.Error()) - return - } + task := func() (interface{}, error) { + defer wg.Done() - err = s.indirectProbe(&member) - if err != nil { - end <- ProbeResponse{err: err} - return - } + err := s.ping(&member) + if err == nil { + return nil, nil + } - end <- ProbeResponse{} + if err != ErrSendTimeout { + return nil, ErrPingFailed + } + + err = s.indirectProbe(&member) + if err != nil { + return nil, err + } + + return nil, nil + } + resp := NewTaskRunner(task, ctx).Start() + end <- ProbeResponse{err: resp.err} }() select { // if timed-out, then suspect member case <-timer.C: iLogger.Infof(nil, "[SWIM] probe member [%s] timed out, start suspect", member.ID) + // when probing time is out, then cancel the probing procedure + cancel() + wg.Wait() + s.awareness.ApplyDelta(1) s.suspect(&member) // if probe ended with error then suspect member and increase Awareness // otherwise just decrease Awareness score case resp := <-end: - if resp.err != nil { + if !resp.Ok() { + fmt.Println("not ok") s.awareness.ApplyDelta(1) s.suspect(&member) return @@ -434,7 +475,7 @@ func (s *SWIM) indirectProbe(target *Member) error { // this @ctx context ctx, cancel := context.WithCancel(context.Background()) - done := make(chan pb.Message, s.config.K) + done := make(chan IndProbeResponse, s.config.K) defer func() { cancel() @@ -451,52 +492,51 @@ func (s *SWIM) indirectProbe(target *Member) error { } resp := NewTaskRunner(task, ctx).Start() - if resp.payload == nil { + if resp.err != nil { + done <- IndProbeResponse{ + err: resp.err, + msg: pb.Message{}, + } return } msg, ok := resp.payload.(pb.Message) if !ok { + done <- IndProbeResponse{ + err: ErrInvalidPayloadType, + msg: pb.Message{}, + } return } - done <- msg + done <- IndProbeResponse{ + err: nil, + msg: msg, + } }(m) } // wait until k-random member sends back response, if response message // is Ack message, then indirectProbe success because one of k-member - // success UDP communication, if Nack message, increase @returnedNack - // counter then wait other member's response - // if all of k-random members sends back Nack message, then indirectProbe - // failed. + // success UDP communication, if Nack message or Invalid message, increase + // @unexpectedRespCounter then wait other member's response - returnedNackCounter := 0 - invalidResponseCounter := 0 + unexpectedResp := make([]IndProbeResponse, 0) for { - select { - case msg := <-done: - switch msg.Payload.(type) { - case *pb.Message_Ack: - // if one of members received ACK message then send the cancel - // signal to other members, then wait until other members receive - // this cancel signal - cancel() - wg.Wait() - return nil - case *pb.Message_Nack: - returnedNackCounter += 1 - if returnedNackCounter == s.config.K { - return ErrAllSentNackMsg - } - default: - iLogger.Errorf(nil, "Invalid message type from [%s]", msg.Address) - invalidResponseCounter += 1 - if invalidResponseCounter == s.config.K { - return ErrAllSentInvalidMsg - } + resp := <-done + + if !resp.Ok() { + unexpectedResp = append(unexpectedResp, resp) + if len(unexpectedResp) >= s.config.K { + iLogger.Infof(nil, "unexpected responses [%v]", unexpectedResp) + return ErrIndProbeFailed } + continue } + + cancel() + wg.Wait() + return nil } } @@ -515,7 +555,8 @@ func (s *SWIM) ping(target *Member) error { // send ping message addr := target.Address() - ping := createPingMessage(s.member.Address(), &stats) + pingId := xid.New().String() + ping := createPingMessage(pingId, s.member.Address(), &stats) res, err := s.messageEndpoint.SyncSend(addr, ping) if err != nil { @@ -541,7 +582,8 @@ func (s *SWIM) indirectPing(mediator, target Member) (pb.Message, error) { // send indirect-ping message addr := mediator.Address() - ind := createIndMessage(s.member.Address(), target.Address(), &stats) + indId := xid.New().String() + ind := createIndMessage(indId, s.member.Address(), target.Address(), &stats) res, err := s.messageEndpoint.SyncSend(addr, ind) @@ -617,10 +659,10 @@ func (s *SWIM) handlePing(msg pb.Message) { } // address of messgae source member - scrAddr := msg.Address + srcAddr := msg.Address - ack := createAckMessage(id, &mbrStatsMsg) - if err := s.messageEndpoint.Send(scrAddr, ack); err != nil { + ack := createAckMessage(id, srcAddr, &mbrStatsMsg) + if err := s.messageEndpoint.Send(srcAddr, ack); err != nil { iLogger.Error(nil, err.Error()) } } @@ -644,21 +686,22 @@ func (s *SWIM) handleIndirectPing(msg pb.Message) { // address of indirect-ping's target targetAddr := msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target - ping := createPingMessage(s.member.Address(), &mbrStatsMsg) + pingId := xid.New().String() + ping := createPingMessage(pingId, s.member.Address(), &mbrStatsMsg) // first send the ping to target member, if target member could not send-back // ack message for whatever reason send nack message to source member, // if successfully received ack message from target, then send back ack message // to source member if _, err := s.messageEndpoint.SyncSend(targetAddr, ping); err != nil { - nack := createNackMessage(id, &mbrStatsMsg) + nack := createNackMessage(id, srcAddr, &mbrStatsMsg) if err := s.messageEndpoint.Send(srcAddr, nack); err != nil { iLogger.Error(nil, err.Error()) } return } - ack := createAckMessage(id, &mbrStatsMsg) + ack := createAckMessage(id, srcAddr, &mbrStatsMsg) if err := s.messageEndpoint.Send(srcAddr, ack); err != nil { iLogger.Error(nil, err.Error()) } @@ -690,9 +733,9 @@ func (s *SWIM) handleMembership(membership *pb.Membership, address string) error return nil } -func createPingMessage(src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { +func createPingMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { return pb.Message{ - Id: xid.New().String(), + Id: id, Address: src, Payload: &pb.Message_Ping{ Ping: &pb.Ping{}, @@ -703,9 +746,9 @@ func createPingMessage(src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { } } -func createIndMessage(src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { +func createIndMessage(id, src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { return pb.Message{ - Id: xid.New().String(), + Id: id, Address: src, Payload: &pb.Message_IndirectPing{ IndirectPing: &pb.IndirectPing{ @@ -718,9 +761,10 @@ func createIndMessage(src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Messag } } -func createNackMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { +func createNackMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { return pb.Message{ - Id: seq, + Id: id, + Address: src, Payload: &pb.Message_Nack{ Nack: &pb.Nack{}, }, @@ -730,9 +774,10 @@ func createNackMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { } } -func createAckMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { +func createAckMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message { return pb.Message{ - Id: seq, + Id: id, + Address: src, Payload: &pb.Message_Ack{ Ack: &pb.Ack{}, }, diff --git a/swim_internal_test.go b/swim_internal_test.go index a38ee68..66395be 100644 --- a/swim_internal_test.go +++ b/swim_internal_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/rs/xid" + "github.com/DE-labtory/swim/pb" "github.com/stretchr/testify/assert" ) @@ -829,7 +831,7 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { mKAddr := "1.2.3.4:11111" mJAddr := "3.4.5.6:22222" - stats := pb.PiggyBack{ + pbk := pb.PiggyBack{ MbrStatsMsg: &pb.MbrStatsMsg{ Type: pb.MbrStatsMsg_Alive, Id: "123", @@ -840,7 +842,7 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { pbkStore := MockMbrStatsMsgStore{} pbkStore.PushFunc = func(pbk pb.MbrStatsMsg) {} pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { - return *stats.MbrStatsMsg, nil + return *pbk.MbrStatsMsg, nil } ack := pb.Message{ @@ -867,7 +869,7 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { // msg.Address should be local-node address assert.Equal(t, msg.Address, "9.8.7.6:33333") - assert.Equal(t, msg.PiggyBack.MbrStatsMsg, stats) + assert.Equal(t, msg.PiggyBack.MbrStatsMsg, pbk.MbrStatsMsg) assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "3.4.5.6:22222") return ack, nil @@ -1094,7 +1096,7 @@ func TestSWIM_ping_When_Response_Success(t *testing.T) { assert.Equal(t, addr, "1.2.3.4:11111") assert.Equal(t, msg.Address, "9.8.7.6:33333") assert.Equal(t, msg.PiggyBack, &pbk) - assert.Equal(t, msg.Payload.(*pb.Message_Ping).Ping, pb.Ping{}) + assert.Equal(t, msg.Payload.(*pb.Message_Ping).Ping, &pb.Ping{}) return respMsg, nil } @@ -1202,11 +1204,19 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { mK1MessageHandler := &MockMessageHandler{} mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11181) mK1MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK1Addr, &pb.MbrStatsMsg{}) _, err := mK1MessageEndpoint.SyncSend(mJAddr, ping) assert.NoError(t, err) - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + ack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-1", + }, + }, + } err = mK1MessageEndpoint.Send(mIAddr, ack) assert.NoError(t, err) } @@ -1215,10 +1225,18 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { mK2MessageHandler := &MockMessageHandler{} mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11182) mK2MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK2Addr, &pb.MbrStatsMsg{}) mK2MessageEndpoint.SyncSend(mJAddr, ping) - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + ack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-2", + }, + }, + } mK2MessageEndpoint.Send(mIAddr, ack) } @@ -1235,7 +1253,15 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { defer mJMessageEndpoint.Shutdown() mJMessageHandler.handleFunc = func(msg pb.Message) { - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + ack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-3", + }, + }, + } mJMessageEndpoint.Send(msg.Address, ack) } mJ := &Member{ @@ -1307,11 +1333,19 @@ func TestSWIM_indirectProbe_When_All_Sent_Nack_Message(t *testing.T) { mK1MessageHandler := &MockMessageHandler{} mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11391) mK1MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK1Addr, &pb.MbrStatsMsg{}) _, err := mK1MessageEndpoint.SyncSend(mJAddr, ping) assert.Error(t, err, ErrSendTimeout) - nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + nack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-1", + }, + }, + } err = mK1MessageEndpoint.Send(mIAddr, nack) assert.NoError(t, err) } @@ -1320,10 +1354,18 @@ func TestSWIM_indirectProbe_When_All_Sent_Nack_Message(t *testing.T) { mK2MessageHandler := &MockMessageHandler{} mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11392) mK2MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK2Addr, &pb.MbrStatsMsg{}) mK2MessageEndpoint.SyncSend(mJAddr, ping) - nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + nack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-2", + }, + }, + } mK2MessageEndpoint.Send(mIAddr, nack) } @@ -1408,7 +1450,7 @@ func TestSWIM_indirectProbe_When_All_Sent_Nack_Message(t *testing.T) { defer messageEndpoint.Shutdown() err := swim.indirectProbe(mJ) - assert.Error(t, err, ErrAllSentNackMsg) + assert.Error(t, err, ErrIndProbeFailed) } func TestSWIM_indirectProbe_When_Some_Member_Sent_Nack_Message(t *testing.T) { @@ -1421,10 +1463,18 @@ func TestSWIM_indirectProbe_When_Some_Member_Sent_Nack_Message(t *testing.T) { mK1MessageHandler := &MockMessageHandler{} mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11251) mK1MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK1Addr, &pb.MbrStatsMsg{}) mK1MessageEndpoint.SyncSend(mJAddr, ping) - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + ack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-1", + }, + }, + } mK1MessageEndpoint.Send(mIAddr, ack) } @@ -1432,10 +1482,18 @@ func TestSWIM_indirectProbe_When_Some_Member_Sent_Nack_Message(t *testing.T) { mK2MessageHandler := &MockMessageHandler{} mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11252) mK2MessageHandler.handleFunc = func(msg pb.Message) { - ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK2Addr, &pb.MbrStatsMsg{}) mK2MessageEndpoint.SyncSend(mJAddr, ping) - nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + nack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-2", + }, + }, + } mK2MessageEndpoint.Send(mIAddr, nack) } @@ -1545,7 +1603,15 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) mJMessageHandler.handleFunc = func(msg pb.Message) { - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + ack := pb.Message{ + Id: msg.Id, + Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Id: "mbrID-1", + }, + }, + } mJMessageEndpoint.Send("127.0.0.1:11162", ack) } go mJMessageEndpoint.Listen() @@ -1564,7 +1630,11 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { return pb.MbrStatsMsg{}, nil } - mm := NewMemberMap(&SuspicionConfig{}) + mm := NewMemberMap(&SuspicionConfig{ + k: 2, + min: time.Hour, + max: time.Hour * 8, + }) mI := &Member{ ID: MemberID{ID: "mI"}, @@ -1585,13 +1655,13 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { // setup SWIM's message endpoint tc := PacketTransportConfig{ BindAddress: "127.0.0.1", - BindPort: 13162, + BindPort: 11162, } p, _ := NewPacketTransport(&tc) meConfig := MessageEndpointConfig{ EncryptionEnabled: false, - SendTimeout: time.Second * 3, + SendTimeout: time.Second * 5, CallbackCollectInterval: time.Hour, } mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) @@ -1601,7 +1671,7 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { go mIMessageEndpoint.Listen() defer mIMessageEndpoint.Shutdown() - T := time.NewTimer(time.Second * 5) + T := time.NewTimer(5 * time.Second) swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 1) @@ -1672,7 +1742,11 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { m1Member := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 13163, Status: Alive} m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 13164, Status: Alive} - mm := NewMemberMap(&SuspicionConfig{}) + mm := NewMemberMap(&SuspicionConfig{ + k: 2, + min: time.Hour, + max: time.Hour * 8, + }) mm.members[m1Member.ID] = m1Member mm.members[m2Member.ID] = m2Member @@ -1689,7 +1763,7 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { // setup SWIM's message endpoint tc := PacketTransportConfig{ BindAddress: "127.0.0.1", - BindPort: 11162, + BindPort: 13162, } p, _ := NewPacketTransport(&tc) @@ -1717,10 +1791,10 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:13161") assert.Equal(t, msg.Address, mIAddress) - ping := createPingMessage(mK1Address, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK1Address, &pb.MbrStatsMsg{}) // Send ping message to mj mk1MessageEndpoint.SyncSend(mJAddress, ping) - ack := createAckMessage(msg.Id, &pb.MbrStatsMsg{}) + ack := createAckMessage(msg.Id, mK1Address, &pb.MbrStatsMsg{}) // send back to mi time.Sleep(time.Second) mk1MessageEndpoint.Send(mIAddress, ack) @@ -1739,10 +1813,10 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:13161") assert.Equal(t, msg.Address, mIAddress) - ping := createPingMessage(mK2Address, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK2Address, &pb.MbrStatsMsg{}) // Send ping message to mj mk2MessageEndpoint.SyncSend(mJAddress, ping) - ack := createAckMessage(msg.Id, &pb.MbrStatsMsg{}) + ack := createAckMessage(msg.Id, mK2Address, &pb.MbrStatsMsg{}) // send back to mi mk2MessageEndpoint.Send(mIAddress, ack) return @@ -1759,8 +1833,8 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { mk2MessageEndpoint.Shutdown() }() + T := time.NewTimer(5 * time.Second) mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 13161, Status: Alive} - T := time.NewTimer(time.Second * 5) swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 1) @@ -1865,14 +1939,14 @@ func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") assert.Equal(t, msg.Address, mIAddress) - ping := createPingMessage(mK1Address, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK1Address, &pb.MbrStatsMsg{}) // Send ping message to mj _, err := mk1MessageEndpoint.SyncSend(mJAddress, ping) assert.Error(t, err, ErrSendTimeout) // mk1 response with nack message - nack := createNackMessage(msg.Id, &pb.MbrStatsMsg{}) + nack := createNackMessage(msg.Id, mK1Address, &pb.MbrStatsMsg{}) // send back to mi mk1MessageEndpoint.Send(mIAddress, nack) return @@ -1890,13 +1964,13 @@ func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") assert.Equal(t, msg.Address, mIAddress) - ping := createPingMessage(mK2Address, &pb.MbrStatsMsg{}) + ping := createPingMessage(xid.New().String(), mK2Address, &pb.MbrStatsMsg{}) // Send ping message to mj _, err := mk2MessageEndpoint.SyncSend(mJAddress, ping) assert.Error(t, err, ErrSendTimeout) - nack := createNackMessage(msg.Id, &pb.MbrStatsMsg{}) + nack := createNackMessage(msg.Id, mK2Address, &pb.MbrStatsMsg{}) // send back to mi mk2MessageEndpoint.Send(mIAddress, nack) @@ -1914,163 +1988,13 @@ func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { mk2MessageEndpoint.Shutdown() }() + T := time.NewTimer(5 * time.Second) mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} - T := time.NewTimer(time.Second * 5) swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 3) } -func TestSWIM_startFailureDetector_When_TimedOut(t *testing.T) { - // setup M_J member - mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} - mJMessageHandler := &MockMessageHandler{} - - mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) - mJMessageHandler.handleFunc = func(msg pb.Message) { - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} - - time.Sleep(time.Second * 1) - - mJMessageEndpoint.Send("127.0.0.1:11162", ack) - } - go mJMessageEndpoint.Listen() - defer mJMessageEndpoint.Shutdown() - - // setup M_I - config := &Config{ - BindAddress: "127.0.0.1", - BindPort: 11162, - K: 2, - T: 2000, - } - - pbkStore := &MockMbrStatsMsgStore{} - pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { - return pb.MbrStatsMsg{}, nil - } - - mI := &Member{ - ID: MemberID{ID: "mI"}, - Addr: net.ParseIP("127.0.0.1"), - Port: 11162, - } - - mm := NewMemberMap(&SuspicionConfig{}) - mm.members[mJMember.ID] = &mJMember - - awareness := NewAwareness(8) - awareness.ApplyDelta(2) - - swim := &SWIM{} - swim.member = mI - swim.awareness = awareness - swim.config = config - swim.mbrStatsMsgStore = pbkStore - swim.memberMap = mm - - // setup SWIM's message endpoint - tc := PacketTransportConfig{ - BindAddress: "127.0.0.1", - BindPort: 11162, - } - p, _ := NewPacketTransport(&tc) - - meConfig := MessageEndpointConfig{ - EncryptionEnabled: false, - SendTimeout: time.Second * 10, - CallbackCollectInterval: time.Hour, - } - mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) - - swim.messageEndpoint = mIMessageEndpoint - - go mIMessageEndpoint.Listen() - defer mIMessageEndpoint.Shutdown() - - T := time.NewTimer(time.Second * 5) - - go swim.startFailureDetector() - - select { - case <-T.C: - } -} - -func TestSWIM_startFailureDetector_When_Success_Probe(t *testing.T) { - // setup M_J member - mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} - mJMessageHandler := &MockMessageHandler{} - - mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) - mJMessageHandler.handleFunc = func(msg pb.Message) { - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} - - mJMessageEndpoint.Send("127.0.0.1:11162", ack) - } - go mJMessageEndpoint.Listen() - defer mJMessageEndpoint.Shutdown() - - // setup M_I - config := &Config{ - BindAddress: "127.0.0.1", - BindPort: 11162, - K: 2, - T: 3000, - } - - pbkStore := &MockMbrStatsMsgStore{} - pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { - return pb.MbrStatsMsg{}, nil - } - - mI := &Member{ - ID: MemberID{ID: "mI"}, - Addr: net.ParseIP("127.0.0.1"), - Port: 11162, - } - - mm := NewMemberMap(&SuspicionConfig{}) - mm.members[mJMember.ID] = &mJMember - - awareness := NewAwareness(8) - awareness.ApplyDelta(2) - - swim := &SWIM{} - swim.member = mI - swim.awareness = awareness - swim.config = config - swim.mbrStatsMsgStore = pbkStore - swim.memberMap = mm - - // setup SWIM's message endpoint - tc := PacketTransportConfig{ - BindAddress: "127.0.0.1", - BindPort: 11162, - } - p, _ := NewPacketTransport(&tc) - - meConfig := MessageEndpointConfig{ - EncryptionEnabled: false, - SendTimeout: time.Second * 10, - CallbackCollectInterval: time.Hour, - } - mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) - - swim.messageEndpoint = mIMessageEndpoint - - go mIMessageEndpoint.Listen() - defer mIMessageEndpoint.Shutdown() - - T := time.NewTimer(time.Second * 5) - - go swim.startFailureDetector() - - select { - case <-T.C: - } -} - func createMessageEndpoint(t *testing.T, messageHandler MessageHandler, sendTimeout time.Duration, port int) MessageEndpoint { mConfig := MessageEndpointConfig{ EncryptionEnabled: false, diff --git a/util.go b/util.go index af0174a..c2e68ee 100644 --- a/util.go +++ b/util.go @@ -5,10 +5,18 @@ import ( "errors" "net" "strconv" + "sync/atomic" "github.com/it-chain/iLogger" ) +type status = int32 + +const ( + AVAILABLE status = iota + DIE +) + type Task func() (interface{}, error) type TaskResponse struct { @@ -17,8 +25,9 @@ type TaskResponse struct { } type TaskRunner struct { - task Task - ctx context.Context + task Task + ctx context.Context + stopFlag int32 } func NewTaskRunner(task Task, ctx context.Context) *TaskRunner { @@ -28,14 +37,26 @@ func NewTaskRunner(task Task, ctx context.Context) *TaskRunner { } } +func (t *TaskRunner) stop() { + atomic.CompareAndSwapInt32(&t.stopFlag, AVAILABLE, DIE) +} + +func (t *TaskRunner) toDie() bool { + return atomic.LoadInt32(&(t.stopFlag)) == DIE +} + func (t *TaskRunner) Start() TaskResponse { done := make(chan TaskResponse) defer func() { - close(done) + t.stop() }() go func() { result, err := t.task() + if t.toDie() { + return + } + if err != nil { iLogger.Errorf(nil, "[TaskRunner] error occured: [%s]", err.Error()) done <- TaskResponse{