Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

*: allow watches on individual resources #108

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,25 @@ func resourceWatchURL(endpoint, namespace string, r Resource, options ...Option)
return "", fmt.Errorf("unregistered type %T", r)
}

// Hack to let watch work on individual resources
name := ""
if meta := r.GetMetadata(); meta != nil && meta.Name != nil {
name = *meta.Name
if meta.Namespace != nil {
// Ensure that namespaces aren't different.
ns := *meta.Namespace
if namespace != "" && ns != namespace {
return "", fmt.Errorf("different namespace provided on resource than to watch call")
}
namespace = ns
}
}

if !t.namespaced && namespace != "" {
return "", fmt.Errorf("type not namespaced")
}

url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, "", options...)
url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, name, options...)
if strings.Contains(url, "?") {
url = url + "&watch=true"
} else {
Expand Down
110 changes: 110 additions & 0 deletions resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,113 @@ func TestResourceURL(t *testing.T) {
})
}
}

func TestResourceWatchURL(t *testing.T) {
tests := []struct {
name string
endpoint string
namespace string
resource Resource
options []Option
want string
wantErr bool
}{
{
name: "watch_pods",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Pod{},
want: "https://k8s.example.com/foo/api/v1/namespaces/my-namespace/pods?watch=true",
},
{
name: "watch_all_pods",
endpoint: "https://k8s.example.com/foo/",
resource: &Pod{},
want: "https://k8s.example.com/foo/api/v1/pods?watch=true",
},
{
name: "watch_deployments",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?watch=true",
},
{
name: "watch_with_options",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{},
options: []Option{
Timeout(time.Minute),
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?timeoutSeconds=60&watch=true",
},
{
name: "watch_non_namespaced",
endpoint: "https://k8s.example.com/foo/",
resource: &ClusterRole{},
want: "https://k8s.example.com/foo/apis/rbac.authorization.k8s.io/v1/clusterroles?watch=true",
},
{
name: "watch_non_namespaced_with_namespace",
namespace: "my-namespace",
endpoint: "https://k8s.example.com/foo/",
resource: &ClusterRole{},
wantErr: true, // can't provide a namespace for a non-namespaced resource
},
{
name: "watch_deployment",
endpoint: "https://k8s.example.com/foo/",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Namespace: String("my-namespace"),
Name: String("my-deployment"),
},
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
},
{
name: "watch_deployment_ns_in_call",
endpoint: "https://k8s.example.com/foo/",
namespace: "my-namespace",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Name: String("my-deployment"),
},
},
want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true",
},
{
name: "watch_deployment_mismatched_ns",
endpoint: "https://k8s.example.com/foo/",
namespace: "my-other-namespace",
resource: &Deployment{
Metadata: &metav1.ObjectMeta{
Namespace: String("my-namespace"),
Name: String("my-deployment"),
},
},
wantErr: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got, err := resourceWatchURL(
test.endpoint,
test.namespace,
test.resource,
test.options...,
)
if err != nil {
if !test.wantErr {
t.Fatalf("resourceWatchURL: %v", err)
}
return
}
if got != test.want {
t.Errorf("want: %q", test.want)
t.Errorf("got : %q", got)
}
})
}
}
16 changes: 16 additions & 0 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,22 @@ func parseUnknown(b []byte) (*runtime.Unknown, error) {
// fmt.Println(eventType, *cm.Metadata.Name)
// }
//
// To watch an individual resource, provide a resource with pre-populated
// metadata:
//
// // Watch "my-configmap" in "my-namespace"
// configMap := corev1.ConfigMap{
// Metadata: &metav1.ObjectMeta{
// Namespace: String("my-namespace"),
// Name: String("my-configmap"),
// },
// }
// watcher, err := client.Watch(ctx, "", &configMap)
// if err != nil {
// // handle error
// }
// defer watcher.Close() // Always close the returned watcher.
//
func (c *Client) Watch(ctx context.Context, namespace string, r Resource, options ...Option) (*Watcher, error) {
url, err := resourceWatchURL(c.Endpoint, namespace, r, options...)
if err != nil {
Expand Down
82 changes: 55 additions & 27 deletions watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,52 +24,62 @@ func init() {
k8s.Register("", "v1", "configmaps", true, &configMapJSON{})
}

func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
w, err := client.Watch(context.TODO(), namespace, newCM())
func wantEvent(t *testing.T, w *k8s.Watcher, eventType string, got, want k8s.Resource) {
t.Helper()
eT, err := w.Next(got)
if err != nil {
t.Errorf("watch configmaps: %v", err)
t.Errorf("decode watch event: %v", err)
return
}
defer w.Close()
if eT != eventType {
t.Errorf("expected event type %q got %q", eventType, eT)
}
want.GetMetadata().ResourceVersion = k8s.String("")
got.GetMetadata().ResourceVersion = k8s.String("")
if !reflect.DeepEqual(got, want) {
t.Errorf("configmaps didn't match")
t.Errorf("want: %#v", want)
t.Errorf(" got: %#v", got)
}
}

func testWatch(t *testing.T, client *k8s.Client, namespace string, r k8s.Resource, newCM func() k8s.Resource, update func(cm k8s.Resource)) {
cm := newCM()
want := func(eventType string) {
got := newCM()
eT, err := w.Next(got)
if err != nil {
t.Errorf("decode watch event: %v", err)

if r.GetMetadata() != nil {
// Individual watch must created beforehand
if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
}
if eT != eventType {
t.Errorf("expected event type %q got %q", eventType, eT)
}
cm.GetMetadata().ResourceVersion = k8s.String("")
got.GetMetadata().ResourceVersion = k8s.String("")
if !reflect.DeepEqual(got, cm) {
t.Errorf("configmaps didn't match")
t.Errorf("want: %#v", cm)
t.Errorf(" got: %#v", got)
}
}
w, err := client.Watch(context.TODO(), namespace, r)
if err != nil {
t.Fatalf("watch configmaps: %v", err)
}
defer w.Close()

if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
if r.GetMetadata() == nil {
if err := client.Create(context.TODO(), cm); err != nil {
t.Errorf("create configmap: %v", err)
return
}
wantEvent(t, w, k8s.EventAdded, newCM(), cm)
}
want(k8s.EventAdded)

update(cm)

if err := client.Update(context.TODO(), cm); err != nil {
t.Errorf("update configmap: %v", err)
return
}
want(k8s.EventModified)
wantEvent(t, w, k8s.EventModified, newCM(), cm)

if err := client.Delete(context.TODO(), cm); err != nil {
t.Errorf("Delete configmap: %v", err)
return
}
want(k8s.EventDeleted)
wantEvent(t, w, k8s.EventDeleted, newCM(), cm)
}

func TestWatchConfigMapJSON(t *testing.T) {
Expand All @@ -86,7 +96,7 @@ func TestWatchConfigMapJSON(t *testing.T) {
updateCM := func(cm k8s.Resource) {
(cm.(*configMapJSON)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM, updateCM)
testWatch(t, client, namespace, &configMapJSON{}, newCM, updateCM)
})
}

Expand All @@ -104,6 +114,24 @@ func TestWatchConfigMapProto(t *testing.T) {
updateCM := func(cm k8s.Resource) {
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM, updateCM)
testWatch(t, client, namespace, &corev1.ConfigMap{}, newCM, updateCM)
})
}

func TestWatchIndividualConfigMap(t *testing.T) {
withNamespace(t, func(client *k8s.Client, namespace string) {
newCM := func() k8s.Resource {
return &corev1.ConfigMap{
Metadata: &metav1.ObjectMeta{
Name: k8s.String("my-configmap"),
Namespace: &namespace,
},
}
}

updateCM := func(cm k8s.Resource) {
(cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"}
}
testWatch(t, client, namespace, newCM(), newCM, updateCM)
})
}