Skip to content

Commit

Permalink
fix: canceled should always be set to true when cancel a watch request (
Browse files Browse the repository at this point in the history
#373)

* fix: canceled should always be set to true when cancel a watch request
* send error message
* change custom error to ErrGRPCUnhealthy

Signed-off-by: Nic <[email protected]>
  • Loading branch information
nic-6443 authored Dec 6, 2024
1 parent 59c88f9 commit ffc8345
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
15 changes: 9 additions & 6 deletions pkg/logstructured/logstructured.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,17 +425,20 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
}

result := make(chan []*server.Event, 100)
wr := server.WatchResult{Events: result}
errc := make(chan error, 1)
wr := server.WatchResult{Events: result, Errorc: errc}

rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
if err != nil {
if !errors.Is(err, context.Canceled) {
logrus.Errorf("Failed to list %s for revision %d: %v", prefix, revision, err)
}
if err == server.ErrCompacted {
compact, _ := l.log.CompactRevision(ctx)
wr.CompactRevision = compact
wr.CurrentRevision = rev
if err == server.ErrCompacted {
compact, _ := l.log.CompactRevision(ctx)
wr.CompactRevision = compact
wr.CurrentRevision = rev
} else {
errc <- server.ErrGRPCUnhealthy
}
}
cancel()
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
var (
ErrNotSupported = status.New(codes.InvalidArgument, "etcdserver: unsupported operations in txn request").Err()

ErrKeyExists = rpctypes.ErrGRPCDuplicateKey
ErrCompacted = rpctypes.ErrGRPCCompacted
ErrFutureRev = rpctypes.ErrGRPCFutureRev
ErrKeyExists = rpctypes.ErrGRPCDuplicateKey
ErrCompacted = rpctypes.ErrGRPCCompacted
ErrFutureRev = rpctypes.ErrGRPCFutureRev
ErrGRPCUnhealthy = rpctypes.ErrGRPCUnhealthy
)

type Backend interface {
Expand Down Expand Up @@ -85,6 +86,7 @@ type WatchResult struct {
CurrentRevision int64
CompactRevision int64
Events <-chan []*Event
Errorc <-chan error
}

func unsupported(field string) error {
Expand Down
9 changes: 7 additions & 2 deletions pkg/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest)
}
}

w.Cancel(id, 0, 0, nil)
select {
case err := <-wr.Errorc:
w.Cancel(id, 0, 0, err)
default:
w.Cancel(id, 0, 0, nil)
}
logrus.Tracef("WATCH CLOSE id=%d, key=%s", id, key)
}()
}
Expand Down Expand Up @@ -219,7 +224,7 @@ func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) {

serr := w.server.Send(&etcdserverpb.WatchResponse{
Header: txnHeader(revision),
Canceled: err != nil,
Canceled: true,
CancelReason: reason,
WatchId: watchID,
CompactRevision: compactRev,
Expand Down

0 comments on commit ffc8345

Please sign in to comment.