Skip to content

Commit

Permalink
add callback to handle nacks
Browse files Browse the repository at this point in the history
Signed-off-by: Akshay Gupta <[email protected]>
  • Loading branch information
akshaysngupta committed Oct 11, 2023
1 parent b652489 commit 6a806b2
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 18 deletions.
12 changes: 12 additions & 0 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Callbacks interface {
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
// OnStreamDeltaResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
// OnStreamDeltaResponseNacked is called when a stream receives a NACK response.
// Returning true will cause the stream to be reprocessed.
OnStreamDeltaResponseNacked(int64, *discovery.DeltaDiscoveryRequest) bool
}

var deltaErrorResponse = &cache.RawDeltaResponse{}
Expand Down Expand Up @@ -212,6 +215,15 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// It can still be done by explicitly unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
if watch.nonce == req.ResponseNonce {
// If Envoy responded with a NACK, check if we should retry.
if req.ErrorDetail != nil || s.callbacks != nil {
if !s.callbacks.OnStreamDeltaResponseNacked(streamID, req) {
continue
}
}
}

watch.Cancel()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Callbacks interface {
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
// OnStreamResponse is called immediately prior to sending a response on a stream.
OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
// OnStreamResponseNacked is called when a stream receives a NACK response.
// Returning true will cause the stream to be reprocessed.
OnStreamResponseNacked(int64, *discovery.DiscoveryRequest) bool
}

// NewServer creates handlers from a config watcher and callbacks.
Expand Down
7 changes: 7 additions & 0 deletions pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
// If Envoy responded with a NACK, check if we should retry.
if req.ErrorDetail != nil || s.callbacks != nil {
if !s.callbacks.OnStreamResponseNacked(sw.ID, req) {
continue
}
}

w.close()

sw.watches.addWatch(typeURL, &watch{
Expand Down
41 changes: 31 additions & 10 deletions pkg/server/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ type Callbacks interface {

// CallbackFuncs is a convenience type for implementing the Callbacks interface.
type CallbackFuncs struct {
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
StreamDeltaResponseFunc func(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
FetchRequestFunc func(context.Context, *discovery.DiscoveryRequest) error
FetchResponseFunc func(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamResponseNackedFunc func(int64, *discovery.DiscoveryRequest) bool
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
StreamDeltaResponseFunc func(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
StreamDeltaResponseNackedFunc func(int64, *discovery.DeltaDiscoveryRequest) bool
FetchRequestFunc func(context.Context, *discovery.DiscoveryRequest) error
FetchResponseFunc func(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}

var _ Callbacks = CallbackFuncs{}
Expand Down Expand Up @@ -132,6 +134,16 @@ func (c CallbackFuncs) OnStreamResponse(ctx context.Context, streamID int64, req
}
}

// OnStreamResponseNacked invokes StreamResponseNackedFunc.
func (c CallbackFuncs) OnStreamResponseNacked(streamID int64, req *discovery.DiscoveryRequest) bool {
if c.StreamResponseNackedFunc != nil {
return c.StreamResponseNackedFunc(streamID, req)
}

// Default to true
return true
}

// OnStreamDeltaRequest invokes StreamDeltaResponseFunc
func (c CallbackFuncs) OnStreamDeltaRequest(streamID int64, req *discovery.DeltaDiscoveryRequest) error {
if c.StreamDeltaRequestFunc != nil {
Expand All @@ -141,6 +153,15 @@ func (c CallbackFuncs) OnStreamDeltaRequest(streamID int64, req *discovery.Delta
return nil
}

// OnStreamDeltaResponseNacked invokes StreamDeltaResponseNackedFunc.
func (c CallbackFuncs) OnStreamDeltaResponseNacked(streamID int64, req *discovery.DeltaDiscoveryRequest) bool {
if c.StreamDeltaResponseNackedFunc != nil {
return c.StreamDeltaResponseNackedFunc(streamID, req)
}

return true
}

// OnStreamDeltaResponse invokes StreamDeltaResponseFunc.
func (c CallbackFuncs) OnStreamDeltaResponse(streamID int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) {
if c.StreamDeltaResponseFunc != nil {
Expand Down
37 changes: 29 additions & 8 deletions pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import (
)

type Callbacks struct {
Signal chan struct{}
Debug bool
Fetches int
Requests int
Responses int
DeltaRequests int
DeltaResponses int
mu sync.Mutex
Signal chan struct{}
Debug bool
Fetches int
Requests int
Responses int
ResponseNacks int
DeltaRequests int
DeltaResponses int
DeltaResponseNacks int
mu sync.Mutex
}

var _ server.Callbacks = &Callbacks{}
Expand Down Expand Up @@ -78,11 +80,30 @@ func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discov
}
}

func (cb *Callbacks) OnStreamResponseNacked(streamID int64, req *discovery.DiscoveryRequest) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.ResponseNacks++
if cb.Debug {
log.Printf("received nack for %s on stream %d with error %v", req.GetTypeUrl(), streamID, req.ErrorDetail)
}

return false
}

func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, res *discovery.DeltaDiscoveryResponse) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaResponses++
}

func (cb *Callbacks) OnStreamDeltaResponseNacked(int64, *discovery.DeltaDiscoveryRequest) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaResponseNacks++
return false
}

func (cb *Callbacks) OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
Expand Down

0 comments on commit 6a806b2

Please sign in to comment.