Skip to content

Commit

Permalink
create TaskRunner stop, toDie func
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroFruit committed Mar 26, 2019
1 parent 757f050 commit 1b9b94c
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 260 deletions.
7 changes: 0 additions & 7 deletions member_map_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,21 +424,14 @@ func TestMemberMap_Suspect_When_Member_Suspect_Without_Suspicion(t *testing.T) {
},
ConfirmerID: "IAMCONFIRMER",
}

res, err := m.Suspect(msg1)

assert.Equal(t, res, true)
assert.Equal(t, err, nil)
assert.Equal(t, member1.Incarnation, msg1.Incarnation)
assert.Equal(t, member1.Status, Suspected)
assert.Equal(t, member1.Suspicion.confirmations, map[MemberID]struct{}{
MemberID{ID: "IAMCONFIRMER"}: {},
})

// When member already have suspicion not update timestamp
// only update suspicion timeout, in this case member1 have no initial timestamp
// so assert with isZero
assert.True(t, member1.LastStatusChange.IsZero())
}

func TestMemberMap_Suspect_When_Dead(t *testing.T) {
Expand Down
175 changes: 110 additions & 65 deletions swim.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package swim
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -30,13 +32,27 @@ import (
)

var ErrInvalidMbrStatsMsgType = errors.New("error invalid mbrStatsMsg type")
var ErrAllSentNackMsg = errors.New("error all of m_k members sent back nack message")
var ErrAllSentInvalidMsg = errors.New("error all of m_k members sent back invalid message")
var ErrPingFailed = errors.New("error ping failed")
var ErrIndProbeFailed = errors.New("error when indirect-probe failed")
var ErrInvalidPayloadType = errors.New("invalid indirect-ping response payload type")

type ProbeResponse struct {
err error
}

func (r ProbeResponse) Ok() bool {
return r.err == nil
}

type IndProbeResponse struct {
err error
msg pb.Message
}

func (r IndProbeResponse) Ok() bool {
return r.err == nil && reflect.TypeOf(r.msg.Payload) == reflect.TypeOf(&pb.Message_Ack{})
}

type Config struct {

// The maximum number of times the same piggyback data can be queried
Expand Down Expand Up @@ -75,6 +91,9 @@ type SWIM struct {
// Information of this node
member *Member

// stopFlag tells whether SWIM is running or not
stopFlag int32

// FailureDetector quit channel
quitFD chan struct{}

Expand Down Expand Up @@ -122,6 +141,7 @@ func messageEndpointFactory(config *Config, messageEndpointConfig MessageEndpoin

// Start SWIM protocol.
func (s *SWIM) Start() {
atomic.CompareAndSwapInt32(&s.stopFlag, DIE, AVAILABLE)
go s.messageEndpoint.Listen()
s.startFailureDetector()
}
Expand Down Expand Up @@ -304,10 +324,16 @@ func (s *SWIM) Gossip(msg []byte) {

// Shutdown the running swim.
func (s *SWIM) ShutDown() {
atomic.CompareAndSwapInt32(&s.stopFlag, AVAILABLE, DIE)
s.messageEndpoint.Shutdown()
s.quitFD <- struct{}{}
}

// toDie tell whether SWIM is stopped or not
func (s *SWIM) toDie() bool {
return atomic.LoadInt32(&(s.stopFlag)) == DIE
}

// Total Failure Detection is performed for each` T`. (ref: https://github.com/DE-labtory/swim/edit/develop/docs/Docs.md)
//
// 1. SWIM randomly selects a member(j) in the memberMap and ping to the member(j).
Expand Down Expand Up @@ -335,7 +361,7 @@ func (s *SWIM) startFailureDetector() {
interval := time.Millisecond * time.Duration(s.config.T)
T := time.NewTimer(interval)

for {
for !s.toDie() {
members := s.memberMap.GetMembers()
if len(members) == 0 {
<-T.C
Expand Down Expand Up @@ -381,37 +407,52 @@ func (s *SWIM) probe(member Member, timer *time.Timer) {
close(end)
}()

// this context and wait group are used to cancel probe procedure
// when probing time is out
wg := sync.WaitGroup{}
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())

go func() {
err := s.ping(&member)
if err == nil {
end <- ProbeResponse{}
return
}
if err != ErrSendTimeout {
iLogger.Errorf(nil, "Error occurred when ping failed [%s]", err.Error())
return
}
task := func() (interface{}, error) {
defer wg.Done()

err = s.indirectProbe(&member)
if err != nil {
end <- ProbeResponse{err: err}
return
}
err := s.ping(&member)
if err == nil {
return nil, nil
}

end <- ProbeResponse{}
if err != ErrSendTimeout {
return nil, ErrPingFailed
}

err = s.indirectProbe(&member)
if err != nil {
return nil, err
}

return nil, nil
}
resp := NewTaskRunner(task, ctx).Start()
end <- ProbeResponse{err: resp.err}
}()

select {
// if timed-out, then suspect member
case <-timer.C:
iLogger.Infof(nil, "[SWIM] probe member [%s] timed out, start suspect", member.ID)
// when probing time is out, then cancel the probing procedure
cancel()
wg.Wait()

s.awareness.ApplyDelta(1)
s.suspect(&member)

// if probe ended with error then suspect member and increase Awareness
// otherwise just decrease Awareness score
case resp := <-end:
if resp.err != nil {
if !resp.Ok() {
fmt.Println("not ok")
s.awareness.ApplyDelta(1)
s.suspect(&member)
return
Expand All @@ -434,7 +475,7 @@ func (s *SWIM) indirectProbe(target *Member) error {
// this @ctx context
ctx, cancel := context.WithCancel(context.Background())

done := make(chan pb.Message, s.config.K)
done := make(chan IndProbeResponse, s.config.K)

defer func() {
cancel()
Expand All @@ -451,52 +492,51 @@ func (s *SWIM) indirectProbe(target *Member) error {
}

resp := NewTaskRunner(task, ctx).Start()
if resp.payload == nil {
if resp.err != nil {
done <- IndProbeResponse{
err: resp.err,
msg: pb.Message{},
}
return
}

msg, ok := resp.payload.(pb.Message)
if !ok {
done <- IndProbeResponse{
err: ErrInvalidPayloadType,
msg: pb.Message{},
}
return
}
done <- msg
done <- IndProbeResponse{
err: nil,
msg: msg,
}
}(m)
}

// wait until k-random member sends back response, if response message
// is Ack message, then indirectProbe success because one of k-member
// success UDP communication, if Nack message, increase @returnedNack
// counter then wait other member's response
// if all of k-random members sends back Nack message, then indirectProbe
// failed.
// success UDP communication, if Nack message or Invalid message, increase
// @unexpectedRespCounter then wait other member's response

returnedNackCounter := 0
invalidResponseCounter := 0
unexpectedResp := make([]IndProbeResponse, 0)

for {
select {
case msg := <-done:
switch msg.Payload.(type) {
case *pb.Message_Ack:
// if one of members received ACK message then send the cancel
// signal to other members, then wait until other members receive
// this cancel signal
cancel()
wg.Wait()
return nil
case *pb.Message_Nack:
returnedNackCounter += 1
if returnedNackCounter == s.config.K {
return ErrAllSentNackMsg
}
default:
iLogger.Errorf(nil, "Invalid message type from [%s]", msg.Address)
invalidResponseCounter += 1
if invalidResponseCounter == s.config.K {
return ErrAllSentInvalidMsg
}
resp := <-done

if !resp.Ok() {
unexpectedResp = append(unexpectedResp, resp)
if len(unexpectedResp) >= s.config.K {
iLogger.Infof(nil, "unexpected responses [%v]", unexpectedResp)
return ErrIndProbeFailed
}
continue
}

cancel()
wg.Wait()
return nil
}
}

Expand All @@ -515,7 +555,8 @@ func (s *SWIM) ping(target *Member) error {

// send ping message
addr := target.Address()
ping := createPingMessage(s.member.Address(), &stats)
pingId := xid.New().String()
ping := createPingMessage(pingId, s.member.Address(), &stats)

res, err := s.messageEndpoint.SyncSend(addr, ping)
if err != nil {
Expand All @@ -541,7 +582,8 @@ func (s *SWIM) indirectPing(mediator, target Member) (pb.Message, error) {

// send indirect-ping message
addr := mediator.Address()
ind := createIndMessage(s.member.Address(), target.Address(), &stats)
indId := xid.New().String()
ind := createIndMessage(indId, s.member.Address(), target.Address(), &stats)

res, err := s.messageEndpoint.SyncSend(addr, ind)

Expand Down Expand Up @@ -617,10 +659,10 @@ func (s *SWIM) handlePing(msg pb.Message) {
}

// address of messgae source member
scrAddr := msg.Address
srcAddr := msg.Address

ack := createAckMessage(id, &mbrStatsMsg)
if err := s.messageEndpoint.Send(scrAddr, ack); err != nil {
ack := createAckMessage(id, srcAddr, &mbrStatsMsg)
if err := s.messageEndpoint.Send(srcAddr, ack); err != nil {
iLogger.Error(nil, err.Error())
}
}
Expand All @@ -644,21 +686,22 @@ func (s *SWIM) handleIndirectPing(msg pb.Message) {
// address of indirect-ping's target
targetAddr := msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target

ping := createPingMessage(s.member.Address(), &mbrStatsMsg)
pingId := xid.New().String()
ping := createPingMessage(pingId, s.member.Address(), &mbrStatsMsg)

// first send the ping to target member, if target member could not send-back
// ack message for whatever reason send nack message to source member,
// if successfully received ack message from target, then send back ack message
// to source member
if _, err := s.messageEndpoint.SyncSend(targetAddr, ping); err != nil {
nack := createNackMessage(id, &mbrStatsMsg)
nack := createNackMessage(id, srcAddr, &mbrStatsMsg)
if err := s.messageEndpoint.Send(srcAddr, nack); err != nil {
iLogger.Error(nil, err.Error())
}
return
}

ack := createAckMessage(id, &mbrStatsMsg)
ack := createAckMessage(id, srcAddr, &mbrStatsMsg)
if err := s.messageEndpoint.Send(srcAddr, ack); err != nil {
iLogger.Error(nil, err.Error())
}
Expand Down Expand Up @@ -690,9 +733,9 @@ func (s *SWIM) handleMembership(membership *pb.Membership, address string) error
return nil
}

func createPingMessage(src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
func createPingMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
return pb.Message{
Id: xid.New().String(),
Id: id,
Address: src,
Payload: &pb.Message_Ping{
Ping: &pb.Ping{},
Expand All @@ -703,9 +746,9 @@ func createPingMessage(src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
}
}

func createIndMessage(src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
func createIndMessage(id, src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
return pb.Message{
Id: xid.New().String(),
Id: id,
Address: src,
Payload: &pb.Message_IndirectPing{
IndirectPing: &pb.IndirectPing{
Expand All @@ -718,9 +761,10 @@ func createIndMessage(src, target string, mbrStatsMsg *pb.MbrStatsMsg) pb.Messag
}
}

func createNackMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
func createNackMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
return pb.Message{
Id: seq,
Id: id,
Address: src,
Payload: &pb.Message_Nack{
Nack: &pb.Nack{},
},
Expand All @@ -730,9 +774,10 @@ func createNackMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
}
}

func createAckMessage(seq string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
func createAckMessage(id, src string, mbrStatsMsg *pb.MbrStatsMsg) pb.Message {
return pb.Message{
Id: seq,
Id: id,
Address: src,
Payload: &pb.Message_Ack{
Ack: &pb.Ack{},
},
Expand Down
Loading

0 comments on commit 1b9b94c

Please sign in to comment.