Skip to content

Commit

Permalink
Merge pull request #8 from vimeo/gone_status_fix_tested_with_minikube…
Browse files Browse the repository at this point in the history
…_this_time

Fix resource version update upon resync
  • Loading branch information
dfinkel authored Aug 20, 2020
2 parents 721bb88 + 027346f commit c7d3f3a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
os: [macOS-latest, ubuntu-latest]
goversion: [1.13, 1.14]
goversion: [1.13, 1.14, 1.15]
steps:

- name: Set up Go ${{matrix.goversion}} on ${{matrix.os}}
Expand Down
19 changes: 14 additions & 5 deletions k8s_pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -272,11 +273,13 @@ func (p *PodWatcher) Run(ctx context.Context) error {
resync := func() resyncAction {
newversion, resyncErr := p.resync(ctx, cbChans)
if resyncErr != nil {
p.logf("resync failed: %s", resyncErr)
if sleepBackoff() {
return resyncReturn
}
return resyncContinueLoop
}
p.logf("resync succeeded; new version: %q (old %q)", newversion, version)
version = newversion
return resyncSuccess
}
Expand All @@ -287,7 +290,13 @@ func (p *PodWatcher) Run(ctx context.Context) error {
watchOpt.ResourceVersion = version
podWatch, watchStartErr := p.cs.CoreV1().Pods(p.k8sNamespace).Watch(watchOpt)
if watchStartErr != nil {
if !k8serrors.IsGone(watchStartErr) {
switch t := watchStartErr.(type) {
case k8serrors.APIStatus:
if t.Status().Code != http.StatusGone {
return fmt.Errorf("failed to startup watcher with status code %d: %w",
t.Status().Code, watchStartErr)
}
default:
return fmt.Errorf("failed to startup watcher: %w", watchStartErr)
}
switch resync() {
Expand All @@ -302,6 +311,7 @@ func (p *PodWatcher) Run(ctx context.Context) error {
rv, err := p.watch(ctx, podWatch, version, cbChans)
switch err {
case ErrResultsClosed:
version = rv
case errVersionGone:
switch resync() {
case resyncReturn:
Expand All @@ -313,7 +323,6 @@ func (p *PodWatcher) Run(ctx context.Context) error {
default:
return err
}
version = rv

// If it's been a while, reset the backoff so we don't wait too
// long after things have been humming for an hour.
Expand Down Expand Up @@ -342,7 +351,7 @@ func errorEventIsGone(ev watch.Event) bool {
if !isStatus {
return false
}
return errObj.Reason == k8smeta.StatusReasonGone
return errObj.Code == http.StatusGone
}

func (p *PodWatcher) watch(ctx context.Context, podWatch watch.Interface, rv string, cbChans []chan<- PodEvent) (string, error) {
Expand All @@ -364,8 +373,8 @@ func (p *PodWatcher) watch(ctx context.Context, podWatch watch.Interface, rv str
if !ok {
if ev.Type == watch.Error {
p.logf("received error event: %s", ev)
// if the error has reason "Gone",
// return and let the outr loop
// if the error has status code "Gone" (410),
// return and let the outer loop
// reconnect.
if errorEventIsGone(ev) {
podWatch.Stop()
Expand Down
3 changes: 2 additions & 1 deletion k8s_pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package k8swatcher
import (
"context"
"net"
"net/http"
"reflect"
"sync"
"testing"
Expand Down Expand Up @@ -614,7 +615,7 @@ func (goneErr) Error() string {
}

func (goneErr) Status() k8smeta.Status {
return k8smeta.Status{Reason: k8smeta.StatusReasonGone}
return k8smeta.Status{Reason: k8smeta.StatusReasonGone, Code: http.StatusGone}
}

type dummyPods struct {
Expand Down

0 comments on commit c7d3f3a

Please sign in to comment.