Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide callback to decide if nack should be retried #794

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Callbacks interface {
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
// OnStreamDeltaResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
// OnStreamDeltaResponseNacked is called when a response is NACKed.
// Returning true will cause the server to retry the response.
OnStreamDeltaResponseNacked(int64, *discovery.DeltaDiscoveryRequest) bool
}

var deltaErrorResponse = &cache.RawDeltaResponse{}
Expand Down Expand Up @@ -212,6 +215,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// It can still be done by explicitly unsubscribing from "*"
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
if watch.nonce == req.ResponseNonce {
// If client responded with a NACK, check if we should retry.
if req.ErrorDetail != nil || s.callbacks != nil {
if !s.callbacks.OnStreamDeltaResponseNacked(streamID, req) {
// Don't cancel the watch if the response was nacked.
// Discovery Response will not be sent to the client.
continue
}
}
}

watch.Cancel()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Callbacks interface {
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
// OnStreamResponse is called immediately prior to sending a response on a stream.
OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
// OnStreamResponseNacked is called when a stream receives a NACK response.
// Returning true will cause the server to retry the response.
OnStreamResponseNacked(int64, *discovery.DiscoveryRequest) bool
}

// NewServer creates handlers from a config watcher and callbacks.
Expand Down
9 changes: 9 additions & 0 deletions pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
// If client responded with a NACK, check if we should retry.
if req.ErrorDetail != nil || s.callbacks != nil {
if !s.callbacks.OnStreamResponseNacked(sw.ID, req) {
// Don't recreate the watch if the response was nacked.
// Discovery Response will not be sent to the client.
continue
}
}

w.close()

sw.watches.addWatch(typeURL, &watch{
Expand Down
41 changes: 31 additions & 10 deletions pkg/server/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ type Callbacks interface {

// CallbackFuncs is a convenience type for implementing the Callbacks interface.
type CallbackFuncs struct {
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
StreamDeltaResponseFunc func(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
FetchRequestFunc func(context.Context, *discovery.DiscoveryRequest) error
FetchResponseFunc func(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64, *core.Node)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64, *core.Node)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamResponseNackedFunc func(int64, *discovery.DiscoveryRequest) bool
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
StreamDeltaResponseFunc func(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
StreamDeltaResponseNackedFunc func(int64, *discovery.DeltaDiscoveryRequest) bool
FetchRequestFunc func(context.Context, *discovery.DiscoveryRequest) error
FetchResponseFunc func(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}

var _ Callbacks = CallbackFuncs{}
Expand Down Expand Up @@ -132,6 +134,16 @@ func (c CallbackFuncs) OnStreamResponse(ctx context.Context, streamID int64, req
}
}

// OnStreamResponseNacked invokes StreamResponseNackedFunc.
func (c CallbackFuncs) OnStreamResponseNacked(streamID int64, req *discovery.DiscoveryRequest) bool {
if c.StreamResponseNackedFunc != nil {
return c.StreamResponseNackedFunc(streamID, req)
}

// Default to true
return true
}

// OnStreamDeltaRequest invokes StreamDeltaResponseFunc
func (c CallbackFuncs) OnStreamDeltaRequest(streamID int64, req *discovery.DeltaDiscoveryRequest) error {
if c.StreamDeltaRequestFunc != nil {
Expand All @@ -141,6 +153,15 @@ func (c CallbackFuncs) OnStreamDeltaRequest(streamID int64, req *discovery.Delta
return nil
}

// OnStreamDeltaResponseNacked invokes StreamDeltaResponseNackedFunc.
func (c CallbackFuncs) OnStreamDeltaResponseNacked(streamID int64, req *discovery.DeltaDiscoveryRequest) bool {
if c.StreamDeltaResponseNackedFunc != nil {
return c.StreamDeltaResponseNackedFunc(streamID, req)
}

return true
}

// OnStreamDeltaResponse invokes StreamDeltaResponseFunc.
func (c CallbackFuncs) OnStreamDeltaResponse(streamID int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) {
if c.StreamDeltaResponseFunc != nil {
Expand Down
37 changes: 29 additions & 8 deletions pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import (
)

type Callbacks struct {
Signal chan struct{}
Debug bool
Fetches int
Requests int
Responses int
DeltaRequests int
DeltaResponses int
mu sync.Mutex
Signal chan struct{}
Debug bool
Fetches int
Requests int
Responses int
ResponseNacks int
DeltaRequests int
DeltaResponses int
DeltaResponseNacks int
mu sync.Mutex
}

var _ server.Callbacks = &Callbacks{}
Expand Down Expand Up @@ -78,11 +80,30 @@ func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discov
}
}

func (cb *Callbacks) OnStreamResponseNacked(streamID int64, req *discovery.DiscoveryRequest) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.ResponseNacks++
if cb.Debug {
log.Printf("received nack for %s on stream %d with error %v", req.GetTypeUrl(), streamID, req.ErrorDetail)
}

return false
}

func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, res *discovery.DeltaDiscoveryResponse) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaResponses++
}

func (cb *Callbacks) OnStreamDeltaResponseNacked(int64, *discovery.DeltaDiscoveryRequest) bool {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaResponseNacks++
return false
}

func (cb *Callbacks) OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
Expand Down