diff --git a/pkg/yurthub/cachemanager/cache_agent.go b/pkg/yurthub/cachemanager/cache_agent.go index fd4d5728f03..7c7abe25656 100644 --- a/pkg/yurthub/cachemanager/cache_agent.go +++ b/pkg/yurthub/cachemanager/cache_agent.go @@ -56,7 +56,15 @@ func NewCacheAgents(informerFactory informers.SharedInformerFactory, store Stora } func (ca *CacheAgent) HasAny(items ...string) bool { - return ca.agents.HasAny(items...) + newAgents := make([]string, 0, len(items)) + for i := range items { + if n := strings.Index(items[i], "/partialobjectmetadata"); n != -1 { + newAgents = append(newAgents, items[i][:n]) + } else { + newAgents = append(newAgents, items[i]) + } + } + return ca.agents.HasAny(newAgents...) } func (ca *CacheAgent) addConfigmap(obj interface{}) { diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index ee3c4472f66..4447395ff62 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -164,6 +164,7 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) comp, _ := util.ClientComponentFrom(ctx) + isPartialReq, _ := util.IsPartialRequestFrom(ctx) key, err := cm.storage.KeyFunc(storage.KeyBuildInfo{ Component: comp, Namespace: info.Namespace, @@ -176,16 +177,31 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro return nil, err } - listGvk, err := cm.prepareGvkForListObj(schema.GroupVersionResource{ - Group: info.APIGroup, - Version: info.APIVersion, - Resource: info.Resource, - }) - if err != nil { - klog.Errorf("could not get gvk for ListObject for req: %s, %v", util.ReqString(req), err) - // If err is hubmeta.ErrGVRNotRecognized, the reverse proxy will set the HTTP Status Code as 404. - return nil, err + var listGvk schema.GroupVersionKind + if isPartialReq { + convertGvk, _ := util.ConvertGVKFrom(ctx) + if convertGvk == nil { + klog.Errorf("Error to get convert gvk for partial object metadata request") + return nil, fmt.Errorf("error to get convert gvk for partial object metadata reqeust") + } + listGvk = schema.GroupVersionKind{ + Group: convertGvk.Group, + Version: convertGvk.Version, + Kind: convertGvk.Kind, + } + } else { + listGvk, err = cm.prepareGvkForListObj(schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + }) + if err != nil { + klog.Errorf("could not get gvk for ListObject for req: %s, %v", util.ReqString(req), err) + // If err is hubmeta.ErrGVRNotRecognized, the reverse proxy will set the HTTP Status Code as 404. + return nil, err + } } + listObj, err := generateEmptyListObjOfGVK(listGvk) if err != nil { klog.Errorf("could not create ListObj for gvk %s for req: %s, %v", listGvk.String(), util.ReqString(req), err) @@ -439,7 +455,20 @@ func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.Re func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error { comp, _ := util.ClientComponentFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) - s := cm.serializerManager.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + isPartialReq, _ := util.IsPartialRequestFrom(ctx) + if isPartialReq { + convertGvk, _ := util.ConvertGVKFrom(ctx) + if convertGvk != nil { + gvr, _ = meta.UnsafeGuessKindToResource(*convertGvk) + } + } + + s := cm.serializerManager.CreateSerializer(respContentType, gvr.Group, gvr.Version, gvr.Resource) if s == nil { klog.Errorf("could not create serializer in saveListObject, %s", util.ReqInfoString(info)) return fmt.Errorf("could not create serializer in saveListObject, %s", util.ReqInfoString(info)) @@ -464,22 +493,20 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req } klog.V(5).Infof("list items for %s is: %d", util.ReqInfoString(info), len(items)) - kind := strings.TrimSuffix(list.GetObjectKind().GroupVersionKind().Kind, "List") - apiVersion := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - }.String() + gvk := list.GetObjectKind().GroupVersionKind() + kind := strings.TrimSuffix(gvk.Kind, "List") + groupVersion := gvk.GroupVersion().String() accessor := meta.NewAccessor() // Verify if DynamicRESTMapper(which store the CRD info) needs to be updated - if err := cm.restMapperManager.UpdateKind(schema.GroupVersionKind{Group: info.APIGroup, Version: info.APIVersion, Kind: kind}); err != nil { + if err := cm.restMapperManager.UpdateKind(schema.GroupVersionKind{Group: gvk.Group, Version: gvk.Version, Kind: kind}); err != nil { klog.Errorf("could not update the DynamicRESTMapper %v", err) } if info.Name != "" && len(items) == 1 { // list with fieldSelector=metadata.name=xxx accessor.SetKind(items[0], kind) - accessor.SetAPIVersion(items[0], apiVersion) + accessor.SetAPIVersion(items[0], groupVersion) name, _ := accessor.Name(items[0]) ns, _ := accessor.Namespace(items[0]) if ns == "" { @@ -500,7 +527,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req comp, _ := util.ClientComponentFrom(ctx) for i := range items { accessor.SetKind(items[i], kind) - accessor.SetAPIVersion(items[i], apiVersion) + accessor.SetAPIVersion(items[i], groupVersion) name, _ := accessor.Name(items[i]) ns, _ := accessor.Namespace(items[i]) if ns == "" { @@ -528,8 +555,20 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.RequestInfo, b []byte) error { comp, _ := util.ClientComponentFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + isPartialReq, _ := util.IsPartialRequestFrom(ctx) + if isPartialReq { + convertGvk, _ := util.ConvertGVKFrom(ctx) + if convertGvk != nil { + gvr, _ = meta.UnsafeGuessKindToResource(*convertGvk) + } + } - s := cm.serializerManager.CreateSerializer(respContentType, info.APIGroup, info.APIVersion, info.Resource) + s := cm.serializerManager.CreateSerializer(respContentType, gvr.Group, gvr.Version, gvr.Resource) if s == nil { klog.Errorf("could not create serializer in saveOneObject, %s", util.ReqInfoString(info)) return fmt.Errorf("could not create serializer in saveOneObject, %s", util.ReqInfoString(info)) diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index 3c0c0ce0751..b27d39d3ecc 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -73,16 +73,10 @@ func TestCacheGetResponse(t *testing.T) { yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { - group string - version string - keyBuildInfo storage.KeyBuildInfo inputObj runtime.Object - userAgent string - accept string + header map[string]string verb string path string - resource string - namespaced bool expectResult struct { err error rv string @@ -93,16 +87,6 @@ func TestCacheGetResponse(t *testing.T) { cacheResponseErr bool }{ "cache response for pod with not assigned node": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Namespace: "default", - Name: "mypod1", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -114,25 +98,15 @@ func TestCacheGetResponse(t *testing.T) { ResourceVersion: "1", }, }), - userAgent: "kubelet", - accept: "application/json", + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, verb: "GET", path: "/api/v1/namespaces/default/pods/mypod1", - resource: "pods", - namespaced: true, cacheResponseErr: true, }, "cache response for get pod": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Namespace: "default", - Name: "mypod1", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -147,12 +121,12 @@ func TestCacheGetResponse(t *testing.T) { NodeName: "node1", }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods/mypod1", - resource: "pods", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/default/pods/mypod1", expectResult: struct { err error rv string @@ -167,16 +141,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get pod2": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Namespace: "default", - Name: "mypod2", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -191,12 +155,12 @@ func TestCacheGetResponse(t *testing.T) { NodeName: "node1", }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods/mypod2", - resource: "pods", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/default/pods/mypod2", expectResult: struct { err error rv string @@ -211,15 +175,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get node": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Name: "mynode1", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Node{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -230,12 +185,12 @@ func TestCacheGetResponse(t *testing.T) { ResourceVersion: "4", }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes/mynode1", - resource: "nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes/mynode1", expectResult: struct { err error rv string @@ -249,15 +204,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get node2": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Name: "mynode2", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Node{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -268,12 +214,12 @@ func TestCacheGetResponse(t *testing.T) { ResourceVersion: "6", }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes/mynode2", - resource: "nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes/mynode2", expectResult: struct { err error rv string @@ -288,16 +234,6 @@ func TestCacheGetResponse(t *testing.T) { }, //used to test whether custom resources can be cached correctly "cache response for get crontab": { - group: "stable.example.com", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "crontabs", - Namespace: "default", - Name: "crontab1", - Group: "stable.example.com", - Version: "v1", - }, inputObj: runtime.Object(&unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", @@ -309,12 +245,12 @@ func TestCacheGetResponse(t *testing.T) { }, }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", - resource: "crontabs", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab1", expectResult: struct { err error rv string @@ -329,16 +265,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get crontab2": { - group: "stable.example.com", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "crontabs", - Namespace: "default", - Name: "crontab2", - Group: "stable.example.com", - Version: "v1", - }, inputObj: runtime.Object(&unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "stable.example.com/v1", @@ -350,12 +276,12 @@ func TestCacheGetResponse(t *testing.T) { }, }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab2", - resource: "crontabs", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs/crontab2", expectResult: struct { err error rv string @@ -370,15 +296,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get foo without namespace": { - group: "samplecontroller.k8s.io", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Name: "foo1", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, inputObj: runtime.Object(&unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "samplecontroller.k8s.io/v1", @@ -389,12 +306,12 @@ func TestCacheGetResponse(t *testing.T) { }, }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos/foo1", - resource: "foos", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo1", expectResult: struct { err error rv string @@ -408,15 +325,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for get foo2 without namespace": { - group: "samplecontroller.k8s.io", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Name: "foo2", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, inputObj: runtime.Object(&unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "samplecontroller.k8s.io/v1", @@ -427,12 +335,12 @@ func TestCacheGetResponse(t *testing.T) { }, }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos/foo2", - resource: "foos", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos/foo2", expectResult: struct { err error rv string @@ -446,15 +354,6 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for Status": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Name: "test", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&metav1.Status{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -465,11 +364,12 @@ func TestCacheGetResponse(t *testing.T) { Reason: "NotFound", Code: 404, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes/test", - resource: "nodes", + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes/test", expectResult: struct { err error rv string @@ -481,33 +381,16 @@ func TestCacheGetResponse(t *testing.T) { }, }, "cache response for nil object": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Name: "test", - Group: "", - Version: "v1", + inputObj: nil, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", }, - inputObj: nil, - userAgent: "kubelet", - accept: "application/json", verb: "GET", path: "/api/v1/nodes/test", - resource: "nodes", cacheResponseErr: true, }, "cache response for get namespace": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "namespaces", - Name: "kube-system", - Group: "", - Version: "v1", - }, inputObj: runtime.Object(&v1.Namespace{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -518,11 +401,12 @@ func TestCacheGetResponse(t *testing.T) { ResourceVersion: "1", }, }), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/kube-system", - resource: "namespaces", + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/kube-system", expectResult: struct { err error rv string @@ -535,59 +419,113 @@ func TestCacheGetResponse(t *testing.T) { kind: "Namespace", }, }, + "cache response for partial object metadata request": { + inputObj: runtime.Object(&metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepools.apps.openyurt.io", + ResourceVersion: "738", + UID: "4232ad7f-c347-43a3-b64a-1c4bdfeab900", + }, + }), + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1", + }, + verb: "GET", + path: "/apis/apiextensions.k8s.io/v1/customresourcedefinitions/nodepools.apps.openyurt.io", + expectResult: struct { + err error + rv string + name string + ns string + kind string + }{ + rv: "738", + name: "nodepools.apps.openyurt.io", + kind: "PartialObjectMetadata", + }, + }, } accessor := meta.NewAccessor() resolver := newTestRequestInfoResolver() for k, tt := range testcases { t.Run(k, func(t *testing.T) { - s := serializerM.CreateSerializer(tt.accept, tt.group, tt.version, tt.resource) - encoder, err := s.Encoder(tt.accept, nil) - if err != nil { - t.Fatalf("could not create encoder, %v", err) - } - - buf := bytes.NewBuffer([]byte{}) - if tt.inputObj != nil { - err = encoder.Encode(tt.inputObj, buf) - if err != nil { - t.Fatalf("could not encode input object, %v", err) - } - } - req, _ := http.NewRequest(tt.verb, tt.path, nil) - if len(tt.userAgent) != 0 { - req.Header.Set("User-Agent", tt.userAgent) - } - - if len(tt.accept) != 0 { - req.Header.Set("Accept", tt.accept) + for k, v := range tt.header { + req.Header.Set(k, v) } req.RemoteAddr = "127.0.0.1" + var cacheErr error + var info *request.RequestInfo + var comp string var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() - ctx = util.WithRespContentType(ctx, tt.accept) + info, _ = request.RequestInfoFrom(ctx) + // get component + comp, _ = util.ClientComponentFrom(ctx) + + // inject response content type by request content type + reqContentType, _ := util.ReqContentTypeFrom(ctx) + ctx = util.WithRespContentType(ctx, reqContentType) req = req.WithContext(ctx) + + // build response body + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + isPartialReq, _ := util.IsPartialRequestFrom(ctx) + if isPartialReq { + convertGVK, _ := util.ConvertGVKFrom(ctx) + gvr, _ = meta.UnsafeGuessKindToResource(*convertGVK) + } + + s := serializerM.CreateSerializer(reqContentType, gvr.Group, gvr.Version, gvr.Resource) + encoder, err := s.Encoder(reqContentType, nil) + if err != nil { + t.Fatalf("could not create encoder, %v", err) + } + buf := bytes.NewBuffer([]byte{}) + if tt.inputObj != nil { + err = encoder.Encode(tt.inputObj, buf) + if err != nil { + t.Fatalf("could not encode input object, %v", err) + } + } prc := io.NopCloser(buf) - err = yurtCM.CacheResponse(req, prc, nil) + cacheErr = yurtCM.CacheResponse(req, prc, nil) }) handler = proxyutil.WithRequestContentType(handler) handler = proxyutil.WithRequestClientComponent(handler, util.WorkingModeEdge) + handler = proxyutil.WithPartialObjectMetadataRequest(handler) handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) - if tt.cacheResponseErr && err == nil { + if tt.cacheResponseErr && cacheErr == nil { t.Errorf("expect err, but do not get error") - } else if !tt.cacheResponseErr && err != nil { + } else if !tt.cacheResponseErr && cacheErr != nil { t.Errorf("expect no err, but got error %v", err) + } else if tt.cacheResponseErr && cacheErr != nil { + return } - if len(tt.expectResult.name) == 0 { - return + keyInfo := storage.KeyBuildInfo{ + Component: comp, + Namespace: info.Namespace, + Name: info.Name, + Resources: info.Resource, + Group: info.APIGroup, + Version: info.APIVersion, } - key, err := sWrapper.KeyFunc(tt.keyBuildInfo) + key, err := sWrapper.KeyFunc(keyInfo) if err != nil { t.Errorf("failed to create key, %v", err) } @@ -596,7 +534,7 @@ func TestCacheGetResponse(t *testing.T) { if !errors.Is(tt.expectResult.err, err) { t.Errorf("expect get error %v, but got %v", tt.expectResult.err, err) } - t.Logf("get expected err %v for key %s", tt.expectResult.err, tt.keyBuildInfo) + t.Logf("get expected err %v for key %s", tt.expectResult.err, keyInfo) } else { name, _ := accessor.Name(obj) rv, _ := accessor.ResourceVersion(obj) @@ -609,17 +547,15 @@ func TestCacheGetResponse(t *testing.T) { t.Errorf("Got rv %s, but expect rv %s", rv, tt.expectResult.rv) } - if tt.namespaced { - ns, _ := accessor.Namespace(obj) - if tt.expectResult.ns != ns { - t.Errorf("Got ns %s, but expect ns %s", ns, tt.expectResult.ns) - } + ns, _ := accessor.Namespace(obj) + if tt.expectResult.ns != ns { + t.Errorf("Got ns %s, but expect ns %s", ns, tt.expectResult.ns) } if tt.expectResult.kind != kind { t.Errorf("Got kind %s, but expect kind %s", kind, tt.expectResult.kind) } - t.Logf("get key %s successfully", tt.keyBuildInfo) + t.Logf("get key %s successfully", keyInfo) } err = sWrapper.DeleteComponentResources("kubelet") @@ -1025,7 +961,7 @@ func TestCacheWatchResponse(t *testing.T) { t.Errorf("failed to get object from storage") } - if !compareObjectsAndKeys(t, objs, tt.namespaced, tt.expectResult.data) { + if !compareObjectsAndKeys(t, objs, tt.expectResult.data) { t.Errorf("got unexpected objects for keys for watch request") } @@ -1059,31 +995,16 @@ func TestCacheListResponse(t *testing.T) { yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { - group string - version string - keyBuildInfo storage.KeyBuildInfo inputObj runtime.Object - userAgent string - accept string + header map[string]string verb string path string - resource string - namespaced bool expectResult struct { err bool data map[string]struct{} } }{ "list pods": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Namespace: "default", - Group: "", - Version: "v1", - }, inputObj: runtime.Object( &v1.PodList{ TypeMeta: metav1.TypeMeta{ @@ -1130,12 +1051,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - resource: "pods", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/default/pods", expectResult: struct { err bool data map[string]struct{} @@ -1148,14 +1069,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "list nodes": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Group: "", - Version: "v1", - }, inputObj: runtime.Object( &v1.NodeList{ TypeMeta: metav1.TypeMeta{ @@ -1209,12 +1122,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes", - resource: "nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes", expectResult: struct { err bool data map[string]struct{} @@ -1227,15 +1140,7 @@ func TestCacheListResponse(t *testing.T) { }, }, }, - "list nodes with fieldselector": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Group: "", - Version: "v1", - }, + "list nodes with fieldSelector": { inputObj: runtime.Object( &v1.NodeList{ TypeMeta: metav1.TypeMeta{ @@ -1259,12 +1164,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes?fieldselector=metadata.name=mynode", - resource: "nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes?fieldSelector=metadata.name=mynode", expectResult: struct { err bool data map[string]struct{} @@ -1275,14 +1180,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "list runtimeclasses with no objects": { - group: "node.k8s.io", - version: "v1beta1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "runtimeclasses", - Group: "node.k8s.io", - Version: "v1beta1", - }, inputObj: runtime.Object( &nodev1beta1.RuntimeClassList{ TypeMeta: metav1.TypeMeta{ @@ -1295,12 +1192,12 @@ func TestCacheListResponse(t *testing.T) { Items: []nodev1beta1.RuntimeClass{}, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/node.k8s.io/v1beta1/runtimeclasses", - resource: "runtimeclasses", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/node.k8s.io/v1beta1/runtimeclasses", expectResult: struct { err bool data map[string]struct{} @@ -1309,14 +1206,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "list with status": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodetest", - Group: "", - Version: "v1", - }, inputObj: runtime.Object( &metav1.Status{ TypeMeta: metav1.TypeMeta{ @@ -1329,24 +1218,15 @@ func TestCacheListResponse(t *testing.T) { Code: 404, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/node", - resource: "nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/node", }, //used to test whether custom resource list can be cached correctly "cache response for list crontabs": { - group: "stable.example.com", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "crontabs", - Namespace: "default", - Group: "stable.example.com", - Version: "v1", - }, inputObj: runtime.Object( &unstructured.UnstructuredList{ Object: map[string]interface{}{ @@ -1384,12 +1264,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/stable.example.com/v1/namespaces/default/crontabs", - resource: "crontabs", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs", expectResult: struct { err bool data map[string]struct{} @@ -1401,14 +1281,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "cache response for list foos without namespace": { - group: "samplecontroller.k8s.io", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, inputObj: runtime.Object( &unstructured.UnstructuredList{ Object: map[string]interface{}{ @@ -1444,12 +1316,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos", - resource: "foos", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", expectResult: struct { err bool data map[string]struct{} @@ -1461,14 +1333,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "list foos with no objects": { - group: "samplecontroller.k8s.io", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, inputObj: runtime.Object( &unstructured.UnstructuredList{ Object: map[string]interface{}{ @@ -1483,12 +1347,12 @@ func TestCacheListResponse(t *testing.T) { Items: []unstructured.Unstructured{}, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos", - resource: "foos", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", expectResult: struct { err bool data map[string]struct{} @@ -1497,14 +1361,6 @@ func TestCacheListResponse(t *testing.T) { }, }, "list namespaces": { - group: "", - version: "v1", - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "namespaces", - Group: "", - Version: "v1", - }, inputObj: runtime.Object( &v1.NamespaceList{ TypeMeta: metav1.TypeMeta{ @@ -1538,12 +1394,12 @@ func TestCacheListResponse(t *testing.T) { }, }, ), - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces", - resource: "namespaces", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces", expectResult: struct { err bool data map[string]struct{} @@ -1554,56 +1410,136 @@ func TestCacheListResponse(t *testing.T) { }, }, }, + "cache response for partial object metadata list request": { + inputObj: runtime.Object(&metav1.PartialObjectMetadataList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadataList", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "738", + }, + Items: []metav1.PartialObjectMetadata{ + { + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepools.apps.openyurt.io", + UID: "4232ad7f-c347-43a3-b64a-1c4bdfeab900", + ResourceVersion: "738", + }, + }, + { + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurtappsets.apps.openyurt.io", + UID: "4232ad7f-c347-43a3-b64a-1c4bdfeab901", + ResourceVersion: "737", + }, + }, + }, + }), + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1", + }, + verb: "GET", + path: "/apis/apiextensions.k8s.io/v1/customresourcedefinitions", + expectResult: struct { + err bool + data map[string]struct{} + }{ + data: map[string]struct{}{ + "partialobjectmetadata-nodepools.apps.openyurt.io-738": {}, + "partialobjectmetadata-yurtappsets.apps.openyurt.io-737": {}, + }, + }, + }, } resolver := newTestRequestInfoResolver() for k, tt := range testcases { t.Run(k, func(t *testing.T) { - s := serializerM.CreateSerializer(tt.accept, tt.group, tt.version, tt.resource) - encoder, err := s.Encoder(tt.accept, nil) - if err != nil { - t.Fatalf("could not create encoder, %v", err) - } - - buf := bytes.NewBuffer([]byte{}) - err = encoder.Encode(tt.inputObj, buf) - if err != nil { - t.Fatalf("could not encode input object, %v", err) - } - req, _ := http.NewRequest(tt.verb, tt.path, nil) - if len(tt.userAgent) != 0 { - req.Header.Set("User-Agent", tt.userAgent) - } - - if len(tt.accept) != 0 { - req.Header.Set("Accept", tt.accept) + for k, v := range tt.header { + req.Header.Set(k, v) } req.RemoteAddr = "127.0.0.1" + var cacheErr error + var info *request.RequestInfo + var comp string var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() - ctx = util.WithRespContentType(ctx, tt.accept) + info, _ = request.RequestInfoFrom(ctx) + // get component + comp, _ = util.ClientComponentFrom(ctx) + + // inject response content type by request content type + reqContentType, _ := util.ReqContentTypeFrom(ctx) + ctx = util.WithRespContentType(ctx, reqContentType) req = req.WithContext(ctx) + + // build response body + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + isPartialReq, _ := util.IsPartialRequestFrom(ctx) + if isPartialReq { + convertGVK, _ := util.ConvertGVKFrom(ctx) + gvr, _ = meta.UnsafeGuessKindToResource(*convertGVK) + } + + s := serializerM.CreateSerializer(reqContentType, gvr.Group, gvr.Version, gvr.Resource) + encoder, err := s.Encoder(reqContentType, nil) + if err != nil { + t.Fatalf("could not create encoder, %v", err) + } + buf := bytes.NewBuffer([]byte{}) + if tt.inputObj != nil { + err = encoder.Encode(tt.inputObj, buf) + if err != nil { + t.Fatalf("could not encode input object, %v", err) + } + } prc := io.NopCloser(buf) - err = yurtCM.CacheResponse(req, prc, nil) + // call cache response + cacheErr = yurtCM.CacheResponse(req, prc, nil) }) handler = proxyutil.WithRequestContentType(handler) handler = proxyutil.WithRequestClientComponent(handler, util.WorkingModeEdge) + handler = proxyutil.WithPartialObjectMetadataRequest(handler) handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) if tt.expectResult.err { - if err == nil { + if cacheErr == nil { t.Error("Got no error, but expect err") + return } } else { if err != nil { - t.Errorf("Got error %v", err) + t.Errorf("Expect no error, but got error %v", err) + return } - rootKey, err := sWrapper.KeyFunc(tt.keyBuildInfo) + keyInfo := storage.KeyBuildInfo{ + Component: comp, + Namespace: info.Namespace, + Name: info.Name, + Resources: info.Resource, + Group: info.APIGroup, + Version: info.APIVersion, + } + rootKey, err := sWrapper.KeyFunc(keyInfo) if err != nil { t.Errorf("failed to get key, %v", err) } @@ -1619,7 +1555,7 @@ func TestCacheListResponse(t *testing.T) { } } - if !compareObjectsAndKeys(t, objs, tt.namespaced, tt.expectResult.data) { + if !compareObjectsAndKeys(t, objs, tt.expectResult.data) { t.Errorf("got unexpected objects for keys") } } @@ -2379,14 +2315,11 @@ func TestQueryCacheForList(t *testing.T) { yurtCM := NewCacheManager(sWrapper, serializerM, restRESTMapperMgr, fakeSharedInformerFactory) testcases := map[string]struct { - keyBuildInfo storage.KeyBuildInfo - cachedKind string + keyBuildInfo *storage.KeyBuildInfo inputObj []runtime.Object - userAgent string - accept string + header map[string]string verb string path string - namespaced bool expectResult struct { err bool queryErr error @@ -2395,10 +2328,11 @@ func TestQueryCacheForList(t *testing.T) { } }{ "list with no user agent": { - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - namespaced: true, + header: map[string]string{ + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/default/pods", expectResult: struct { err bool queryErr error @@ -2409,13 +2343,6 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list pods": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Namespace: "default", - Group: "", - Version: "v1", - }, inputObj: []runtime.Object{ &v1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -2451,11 +2378,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/default/pods", expectResult: struct { err bool queryErr error @@ -2471,12 +2399,6 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list nodes": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "nodes", - Group: "", - Version: "v1", - }, inputObj: []runtime.Object{ &v1.Node{ TypeMeta: metav1.TypeMeta{ @@ -2519,11 +2441,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes", expectResult: struct { err bool queryErr error @@ -2539,42 +2462,13 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - "list runtimeclass": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "runtimeclasses", - Group: "node.k8s.io", - Version: "v1beta1", - }, - inputObj: []runtime.Object{ - &unstructured.Unstructured{}, - }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/node.k8s.io/v1beta1/runtimeclasses", - namespaced: false, - expectResult: struct { - err bool - queryErr error - rv string - data map[string]struct{} - }{ - data: map[string]struct{}{}, - }, - }, "list pods of one namespace and no pods of this namespace in cache": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Group: "", - Version: "v1", + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/pods/default", - namespaced: false, + verb: "GET", + path: "/api/v1/pods/namespaces/default", expectResult: struct { err bool queryErr error @@ -2585,17 +2479,8 @@ func TestQueryCacheForList(t *testing.T) { queryErr: storage.ErrStorageNotFound, }, }, - //used to test whether the query local Custom Resource list request can be handled correctly "list crontabs": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "crontabs", - Namespace: "default", - Group: "stable.example.com", - Version: "v1", - }, - cachedKind: "stable.example.com/v1/CronTab", inputObj: []runtime.Object{ &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -2631,11 +2516,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/stable.example.com/v1/namespaces/default/crontabs", - namespaced: true, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/stable.example.com/v1/namespaces/default/crontabs", expectResult: struct { err bool queryErr error @@ -2651,13 +2537,6 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list foos": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, - cachedKind: "samplecontroller.k8s.io/v1/Foo", inputObj: []runtime.Object{ &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -2690,11 +2569,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/samplecontroller.k8s.io/v1/foos", expectResult: struct { err bool queryErr error @@ -2709,37 +2589,13 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, - "list foos with no objs": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "foos", - Group: "samplecontroller.k8s.io", - Version: "v1", - }, - cachedKind: "samplecontroller.k8s.io/v1/Foo", - inputObj: []runtime.Object{ - &unstructured.Unstructured{}, - }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/samplecontroller.k8s.io/v1/foos", - namespaced: false, - expectResult: struct { - err bool - queryErr error - rv string - data map[string]struct{} - }{ - data: map[string]struct{}{}, - }, - }, "list unregistered resources": { - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/apis/sample.k8s.io/v1/abcs", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/apis/sample.k8s.io/v1/abcs", expectResult: struct { err bool queryErr error @@ -2751,11 +2607,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list resources not exist": { - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/nodes", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/nodes", expectResult: struct { err bool queryErr error @@ -2767,11 +2624,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list non-existing resource with metadata.name fieldSelector": { - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/kube-system/configmaps?fieldSelector=metadata.name%3Dkubernetes-services-endpoint", - namespaced: false, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", + }, + verb: "GET", + path: "/api/v1/namespaces/kube-system/configmaps?fieldSelector=metadata.name%3Dkubernetes-services-endpoint", expectResult: struct { err bool queryErr error @@ -2783,19 +2641,12 @@ func TestQueryCacheForList(t *testing.T) { }, }, "list existing resource with metadata.name fieldSelector": { - keyBuildInfo: storage.KeyBuildInfo{ - Component: "kubelet", - Resources: "pods", - Group: "", - Version: "v1", - Namespace: "default", - Name: "nginx", + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json", }, - userAgent: "kubelet", - accept: "application/json", - verb: "GET", - path: "/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dnginx", - namespaced: true, + verb: "GET", + path: "/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dnginx", inputObj: []runtime.Object{ &v1.Pod{ TypeMeta: metav1.TypeMeta{ @@ -2822,6 +2673,56 @@ func TestQueryCacheForList(t *testing.T) { }, }, }, + "list crds by partial object metadata request": { + keyBuildInfo: &storage.KeyBuildInfo{ + Component: "kubelet/partialobjectmetadatas.v1.meta.k8s.io", + Resources: "customresourcedefinitions", + Group: "apiextensions.k8s.io", + Version: "v1", + }, + inputObj: []runtime.Object{ + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "nodepools.apps.openyurt.io", + UID: "4232ad7f-c347-43a3-b64a-1c4bdfeab900", + ResourceVersion: "738", + }, + }, + &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "yurtappsets.apps.openyurt.io", + UID: "4232ad7f-c347-43a3-b64a-1c4bdfeab901", + ResourceVersion: "737", + }, + }, + }, + header: map[string]string{ + "User-Agent": "kubelet", + "Accept": "application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1", + }, + verb: "GET", + path: "/apis/apiextensions.k8s.io/v1/customresourcedefinitions", + expectResult: struct { + err bool + queryErr error + rv string + data map[string]struct{} + }{ + rv: "738", + data: map[string]struct{}{ + "partialobjectmetadata-nodepools.apps.openyurt.io-738": {}, + "partialobjectmetadata-yurtappsets.apps.openyurt.io-737": {}, + }, + }, + }, } accessor := meta.NewAccessor() @@ -2829,36 +2730,43 @@ func TestQueryCacheForList(t *testing.T) { for k, tt := range testcases { t.Run(k, func(t *testing.T) { for i := range tt.inputObj { - v, _ := accessor.Name(tt.inputObj[i]) - tt.keyBuildInfo.Name = v - key, err := sWrapper.KeyFunc(tt.keyBuildInfo) + name, _ := accessor.Name(tt.inputObj[i]) + ns, _ := accessor.Namespace(tt.inputObj[i]) + gvk := tt.inputObj[i].GetObjectKind().GroupVersionKind() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + comp := tt.header["User-Agent"] + + keyBuildInfo := storage.KeyBuildInfo{ + Component: comp, + Resources: gvr.Resource, + Group: gvr.Group, + Version: gvr.Version, + Namespace: ns, + Name: name, + } + + if tt.keyBuildInfo != nil { + tt.keyBuildInfo.Name = name + tt.keyBuildInfo.Namespace = ns + keyBuildInfo = *tt.keyBuildInfo + } + + key, err := sWrapper.KeyFunc(keyBuildInfo) if err != nil { t.Errorf("failed to get key, %v", err) } _ = sWrapper.Create(key, tt.inputObj[i]) - } - // It is used to simulate caching GVK information. If the caching is successful, - // the next process can obtain the correct GVK information when constructing an empty List. - if tt.cachedKind != "" { - info := strings.Split(tt.cachedKind, hubmeta.SepForGVR) - gvk := schema.GroupVersionKind{ - Group: info[0], - Version: info[1], - Kind: info[2], + isScheme, t := restRESTMapperMgr.KindFor(gvr) + if !isScheme && t.Empty() { + _ = restRESTMapperMgr.UpdateKind(gvk) } - _ = restRESTMapperMgr.UpdateKind(gvk) } req, _ := http.NewRequest(tt.verb, tt.path, nil) - if len(tt.userAgent) != 0 { - req.Header.Set("User-Agent", tt.userAgent) - } - - if len(tt.accept) != 0 { - req.Header.Set("Accept", tt.accept) + for k, v := range tt.header { + req.Header.Set(k, v) } - req.RemoteAddr = "127.0.0.1" items := make([]runtime.Object, 0) @@ -2878,6 +2786,7 @@ func TestQueryCacheForList(t *testing.T) { }) handler = proxyutil.WithRequestClientComponent(handler, util.WorkingModeEdge) + handler = proxyutil.WithPartialObjectMetadataRequest(handler) handler = filters.WithRequestInfo(handler, resolver) handler.ServeHTTP(httptest.NewRecorder(), req) @@ -2898,7 +2807,7 @@ func TestQueryCacheForList(t *testing.T) { t.Errorf("Got rv %s, but expect rv %s", rv, tt.expectResult.rv) } - if !compareObjectsAndKeys(t, items, tt.namespaced, tt.expectResult.data) { + if !compareObjectsAndKeys(t, items, tt.expectResult.data) { t.Errorf("got unexpected objects for keys") } } @@ -2918,7 +2827,7 @@ func TestQueryCacheForList(t *testing.T) { } } -func compareObjectsAndKeys(t *testing.T, objs []runtime.Object, namespaced bool, keys map[string]struct{}) bool { +func compareObjectsAndKeys(t *testing.T, objs []runtime.Object, keys map[string]struct{}) bool { if len(objs) != len(keys) { t.Errorf("expect %d keys, but got %d objects", len(keys), len(objs)) return false @@ -2932,7 +2841,7 @@ func compareObjectsAndKeys(t *testing.T, objs []runtime.Object, namespaced bool, name, _ := accessor.Name(objs[i]) itemRv, _ := accessor.ResourceVersion(objs[i]) - if namespaced { + if len(ns) != 0 { objKeys[fmt.Sprintf("%s-%s-%s-%s", strings.ToLower(kind), ns, name, itemRv)] = struct{}{} } else { objKeys[fmt.Sprintf("%s-%s-%s", strings.ToLower(kind), name, itemRv)] = struct{}{} @@ -3210,6 +3119,15 @@ func TestCanCacheFor(t *testing.T) { }, expectCache: false, }, + "default user agent kubelet with partialobjectmetadata info": { + request: &proxyRequest{ + userAgent: "kubelet/v1.0", + verb: "GET", + path: "/api/v1/nodes/mynode", + header: map[string]string{"Accept": "application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1"}, + }, + expectCache: true, + }, } for k, tt := range testcases { @@ -3278,6 +3196,7 @@ func checkReqCanCache(m CacheManager, userAgent, verb, path string, header map[s handler = proxyutil.WithListRequestSelector(handler) handler = proxyutil.WithCacheHeaderCheck(handler) handler = proxyutil.WithRequestClientComponent(handler, util.WorkingModeEdge) + handler = proxyutil.WithPartialObjectMetadataRequest(handler) handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver()) handler.ServeHTTP(httptest.NewRecorder(), req) diff --git a/pkg/yurthub/proxy/autonomy/autonomy.go b/pkg/yurthub/proxy/autonomy/autonomy.go index f92d1afa241..13aca27380e 100644 --- a/pkg/yurthub/proxy/autonomy/autonomy.go +++ b/pkg/yurthub/proxy/autonomy/autonomy.go @@ -35,7 +35,6 @@ import ( "github.com/openyurtio/openyurt/pkg/projectinfo" "github.com/openyurtio/openyurt/pkg/yurthub/cachemanager" hubrest "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/rest" - proxyutil "github.com/openyurtio/openyurt/pkg/yurthub/proxy/util" "github.com/openyurtio/openyurt/pkg/yurthub/util" ) @@ -68,7 +67,7 @@ func NewAutonomyProxy( func (ap *AutonomyProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { obj, err := ap.updateNodeStatus(req) if err != nil { - proxyutil.Err(err, rw, req) + util.Err(err, rw, req) } util.WriteObject(http.StatusOK, obj, rw, req) } diff --git a/pkg/yurthub/proxy/local/local.go b/pkg/yurthub/proxy/local/local.go index 50f4afeb803..3ddbad90f9f 100644 --- a/pkg/yurthub/proxy/local/local.go +++ b/pkg/yurthub/proxy/local/local.go @@ -86,11 +86,11 @@ func (lp *LocalProxy) ServeHTTP(w http.ResponseWriter, req *http.Request) { if err != nil { klog.Errorf("could not proxy local for %s, %v", hubutil.ReqString(req), err) - util.Err(err, w, req) + hubutil.Err(err, w, req) } } else { klog.Errorf("local proxy does not support request(%s), requestInfo: %s", hubutil.ReqString(req), hubutil.ReqInfoString(reqInfo)) - util.Err(apierrors.NewBadRequest(fmt.Sprintf("local proxy does not support request(%s)", hubutil.ReqString(req))), w, req) + hubutil.Err(apierrors.NewBadRequest(fmt.Sprintf("local proxy does not support request(%s)", hubutil.ReqString(req))), w, req) } } @@ -110,8 +110,7 @@ func localDelete(w http.ResponseWriter, req *http.Request) error { Message: "delete request is not supported in local cache", } - util.WriteObject(http.StatusForbidden, s, w, req) - return nil + return hubutil.WriteObject(http.StatusForbidden, s, w, req) } // localPost handles Create requests when remote servers are unhealthy @@ -232,7 +231,7 @@ func (lp *LocalProxy) localReqCache(w http.ResponseWriter, req *http.Request) er return apierrors.NewInternalError(fmt.Errorf("no cache object for %s", hubutil.ReqString(req))) } - return util.WriteObject(http.StatusOK, obj, w, req) + return hubutil.WriteObject(http.StatusOK, obj, w, req) } func copyHeader(dst, src http.Header) { diff --git a/pkg/yurthub/proxy/pool/pool.go b/pkg/yurthub/proxy/pool/pool.go index 69f5c28b514..2e1d95bdcc3 100644 --- a/pkg/yurthub/proxy/pool/pool.go +++ b/pkg/yurthub/proxy/pool/pool.go @@ -109,7 +109,7 @@ func (pp *YurtCoordinatorProxy) ServeHTTP(rw http.ResponseWriter, req *http.Requ reqInfo, ok := apirequest.RequestInfoFrom(ctx) if !ok || reqInfo == nil { klog.Errorf("yurt-coordinator proxy cannot handle request(%s), cannot get requestInfo", hubutil.ReqString(req), reqInfo) - util.Err(errors.NewBadRequest(fmt.Sprintf("yurt-coordinator proxy cannot handle request(%s), cannot get requestInfo", hubutil.ReqString(req))), rw, req) + hubutil.Err(errors.NewBadRequest(fmt.Sprintf("yurt-coordinator proxy cannot handle request(%s), cannot get requestInfo", hubutil.ReqString(req))), rw, req) return } req.Header.Del("Authorization") // delete token with cloud apiServer RBAC and use yurthub authorization @@ -126,11 +126,11 @@ func (pp *YurtCoordinatorProxy) ServeHTTP(rw http.ResponseWriter, req *http.Requ } if err != nil { klog.Errorf("could not proxy to yurt-coordinator for %s, %v", hubutil.ReqString(req), err) - util.Err(errors.NewBadRequest(err.Error()), rw, req) + hubutil.Err(errors.NewBadRequest(err.Error()), rw, req) } } else { klog.Errorf("yurt-coordinator does not support request(%s), requestInfo: %s", hubutil.ReqString(req), hubutil.ReqInfoString(reqInfo)) - util.Err(errors.NewBadRequest(fmt.Sprintf("yurt-coordinator does not support request(%s)", hubutil.ReqString(req))), rw, req) + hubutil.Err(errors.NewBadRequest(fmt.Sprintf("yurt-coordinator does not support request(%s)", hubutil.ReqString(req))), rw, req) } } diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index d47191ef1eb..7c791e7a129 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -176,6 +176,7 @@ func (p *yurtReverseProxy) buildHandlerChain(handler http.Handler) http.Handler handler = util.WithRequestTraceFull(handler) handler = util.WithMaxInFlightLimit(handler, p.maxRequestsInFlight) handler = util.WithRequestClientComponent(handler, p.workingMode) + handler = util.WithPartialObjectMetadataRequest(handler) if p.enableYurtCoordinator { handler = util.WithIfPoolScopedResource(handler) @@ -276,7 +277,7 @@ func (p *yurtReverseProxy) subjectAccessReviewHandler(rw http.ResponseWriter, re } else { err := errors.New("request is from yurt-coordinator but it's currently not healthy") klog.Errorf("could not handle SubjectAccessReview req %s, %v", hubutil.ReqString(req), err) - util.Err(err, rw, req) + hubutil.Err(err, rw, req) } } else { if p.cloudHealthChecker.IsHealthy() { @@ -284,7 +285,7 @@ func (p *yurtReverseProxy) subjectAccessReviewHandler(rw http.ResponseWriter, re } else { err := errors.New("request is from cloud APIServer but it's currently not healthy") klog.Errorf("could not handle SubjectAccessReview req %s, %v", hubutil.ReqString(req), err) - util.Err(err, rw, req) + hubutil.Err(err, rw, req) } } } diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index f2e9d9712e6..3229347c5a8 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -21,6 +21,7 @@ import ( "fmt" "mime" "net/http" + "path/filepath" "strings" "time" @@ -36,8 +37,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/authentication/serviceaccount" - "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" - "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" @@ -60,6 +59,45 @@ var needModifyTimeoutVerb = map[string]bool{ "watch": true, } +// WithPartialObjectMetadataRequest is used for extracting info for partial object metadata request, +// then these info is used by cache manager. +func WithPartialObjectMetadataRequest(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + if info, ok := apirequest.RequestInfoFrom(ctx); ok { + if info.IsResourceRequest { + var gvk schema.GroupVersionKind + acceptHeader := req.Header.Get("Accept") + parts := strings.Split(acceptHeader, ";") + for _, part := range parts { + subParts := strings.SplitN(strings.TrimSpace(part), "=", 2) + if len(subParts) == 2 { + switch subParts[0] { + case "g": + gvk.Group = subParts[1] + case "v": + gvk.Version = subParts[1] + case "as": + gvk.Kind = subParts[1] + default: + } + } + } + + if gvk.Kind == "PartialObjectMetadataList" || gvk.Kind == "PartialObjectMetadata" { + ctx = util.WithIsPartialRequest(ctx, true) + ctx = util.WithConvertGVK(ctx, &gvk) + } else { + ctx = util.WithIsPartialRequest(ctx, false) + } + req = req.WithContext(ctx) + } + } + + handler.ServeHTTP(w, req) + }) +} + // WithRequestContentType add req-content-type in request context. // if no Accept header is set, the request will be reject with a message. func WithRequestContentType(handler http.Handler) http.Handler { @@ -74,6 +112,14 @@ func WithRequestContentType(handler http.Handler) http.Handler { contentType = parts[0] } + subParts := strings.Split(contentType, ";") + for i := range subParts { + if strings.Contains(subParts[i], "as=") { + contentType = subParts[0] + break + } + } + if len(contentType) != 0 { ctx = util.WithReqContentType(ctx, contentType) req = req.WithContext(ctx) @@ -173,6 +219,13 @@ func WithRequestClientComponent(handler http.Handler, mode util.WorkingMode) htt } if userAgent != "" { + isPartialReq, _ := util.IsPartialRequestFrom(ctx) + if isPartialReq { + convertGVK, _ := util.ConvertGVKFrom(ctx) + if convertGVK != nil { + userAgent = filepath.Join(userAgent, strings.Join([]string{"partialobjectmetadatas", convertGVK.Version, convertGVK.Group}, ".")) + } + } ctx = util.WithClientComponent(ctx, userAgent) req = req.WithContext(ctx) } @@ -312,7 +365,7 @@ func WithMaxInFlightLimit(handler http.Handler, limit int) http.Handler { klog.Errorf("Too many requests, please try again later, %s", util.ReqString(req)) metrics.Metrics.IncRejectedRequestCounter() w.Header().Set("Retry-After", "1") - Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req) + util.Err(errors.NewTooManyRequestsError("Too many requests, please try again later."), w, req) } }) } @@ -334,7 +387,7 @@ func WithRequestTimeout(handler http.Handler) http.Handler { opts := metainternalversion.ListOptions{} if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, &opts); err != nil { klog.Errorf("could not decode parameter for list/watch request: %s", util.ReqString(req)) - Err(errors.NewBadRequest(err.Error()), w, req) + util.Err(errors.NewBadRequest(err.Error()), w, req) return } if opts.TimeoutSeconds != nil { @@ -442,38 +495,6 @@ func IsKubeletGetNodeReq(req *http.Request) bool { return true } -// WriteObject write object to response writer -func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req *http.Request) error { - ctx := req.Context() - if info, ok := apirequest.RequestInfoFrom(ctx); ok { - gv := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - } - negotiatedSerializer := serializer.YurtHubSerializer.GetNegotiatedSerializer(gv.WithResource(info.Resource)) - responsewriters.WriteObjectNegotiated(negotiatedSerializer, negotiation.DefaultEndpointRestrictions, gv, w, req, statusCode, obj, false) - return nil - } - - return fmt.Errorf("request info is not found when write object, %s", util.ReqString(req)) -} - -// Err write err to response writer -func Err(err error, w http.ResponseWriter, req *http.Request) { - ctx := req.Context() - if info, ok := apirequest.RequestInfoFrom(ctx); ok { - gv := schema.GroupVersion{ - Group: info.APIGroup, - Version: info.APIVersion, - } - negotiatedSerializer := serializer.YurtHubSerializer.GetNegotiatedSerializer(gv.WithResource(info.Resource)) - responsewriters.ErrorNegotiated(err, negotiatedSerializer, gv, w, req) - return - } - - klog.Errorf("request info is not found when err write, %s", util.ReqString(req)) -} - func IsPoolScopedResourceListWatchRequest(req *http.Request) bool { ctx := req.Context() info, ok := apirequest.RequestInfoFrom(ctx) diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 9d851ac7f31..76c172d4262 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -22,11 +22,13 @@ import ( "fmt" "net/http" "net/http/httptest" + "reflect" "sync" "testing" "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/endpoints/filters" "k8s.io/apiserver/pkg/endpoints/request" @@ -42,6 +44,98 @@ func newTestRequestInfoResolver() *request.RequestInfoFactory { } } +func TestWithPartialObjectMetadataRequest(t *testing.T) { + testcases := map[string]struct { + Verb string + Path string + Header map[string]string + ClientComponent string + IsPartialReq bool + ConvertGVK schema.GroupVersionKind + }{ + "kubelet request": { + Verb: "GET", + Path: "/api/v1/nodes/mynode", + Header: map[string]string{ + "User-Agent": "kubelet", + }, + ClientComponent: "kubelet", + IsPartialReq: false, + }, + "flanneld list request by partial object metadata request": { + Verb: "GET", + Path: "/api/v1/nodes", + Header: map[string]string{ + "User-Agent": "flanneld/0.11.0", + "Accept": "application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1", + }, + ClientComponent: "flanneld/partialobjectmetadatas.v1.meta.k8s.io", + IsPartialReq: true, + ConvertGVK: schema.GroupVersionKind{ + Group: "meta.k8s.io", + Version: "v1", + Kind: "PartialObjectMetadataList", + }, + }, + "flanneld get request by partial object metadata request": { + Verb: "GET", + Path: "/api/v1/nodes/mynode", + Header: map[string]string{ + "User-Agent": "flanneld/0.11.0", + "Accept": "application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1", + }, + ClientComponent: "flanneld/partialobjectmetadatas.v1.meta.k8s.io", + IsPartialReq: true, + ConvertGVK: schema.GroupVersionKind{ + Group: "meta.k8s.io", + Version: "v1", + Kind: "PartialObjectMetadata", + }, + }, + } + + resolver := newTestRequestInfoResolver() + + for k, tc := range testcases { + t.Run(k, func(t *testing.T) { + req, _ := http.NewRequest(tc.Verb, tc.Path, nil) + for k, v := range tc.Header { + req.Header.Set(k, v) + } + req.RemoteAddr = "127.0.0.1" + + var clientComponent string + var isPartialReq bool + var convertGVK *schema.GroupVersionKind + var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + ctx := req.Context() + clientComponent, _ = util.ClientComponentFrom(ctx) + isPartialReq, _ = util.IsPartialRequestFrom(ctx) + convertGVK, _ = util.ConvertGVKFrom(ctx) + }) + + handler = WithRequestClientComponent(handler, util.WorkingModeEdge) + handler = WithPartialObjectMetadataRequest(handler) + handler = filters.WithRequestInfo(handler, resolver) + handler.ServeHTTP(httptest.NewRecorder(), req) + + if clientComponent != tc.ClientComponent { + t.Errorf("expect client component %s, but got %s", tc.ClientComponent, clientComponent) + } + + if isPartialReq != tc.IsPartialReq { + t.Errorf("expect isPartialReq %v, but got %v", tc.IsPartialReq, isPartialReq) + } + + if tc.IsPartialReq { + if !reflect.DeepEqual(tc.ConvertGVK, *convertGVK) { + t.Errorf("expect convert gvk %v, but got %v", tc.ConvertGVK, *convertGVK) + } + } + }) + } +} + func TestWithRequestContentType(t *testing.T) { testcases := map[string]struct { Accept string diff --git a/pkg/yurthub/storage/disk/key_test.go b/pkg/yurthub/storage/disk/key_test.go index 2c48df2cd66..8f91be5181c 100644 --- a/pkg/yurthub/storage/disk/key_test.go +++ b/pkg/yurthub/storage/disk/key_test.go @@ -136,6 +136,17 @@ func TestKeyFunc(t *testing.T) { }, key: "kubelet/namespaces.v1.core/kube-system", }, + "list partial object metadata of crds": { + info: storage.KeyBuildInfo{ + Component: "cilium-agent/partialobjectmetadata", + Resources: "customresourcedefinitions", + Group: "apiextensions.k8s.io", + Version: "v1", + Namespace: "", + }, + key: "cilium-agent/partialobjectmetadata/customresourcedefinitions.v1.apiextensions.k8s.io", + isRoot: true, + }, } disk, err := NewDiskStorage(keyFuncTestDir) diff --git a/pkg/yurthub/util/util.go b/pkg/yurthub/util/util.go index 460b4d421a6..f1f2308e019 100644 --- a/pkg/yurthub/util/util.go +++ b/pkg/yurthub/util/util.go @@ -28,10 +28,11 @@ import ( v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/klog/v2" @@ -66,6 +67,10 @@ const ( ProxyListSelector // ProxyPoolScopedResource represents if this request is asking for pool-scoped resources ProxyPoolScopedResource + // ProxyPartialObjectMetadataRequest represents if this request is getting partial object metadata + ProxyPartialObjectMetadataRequest + // ProxyConvertGVK represents the gvk of response when it is a partial object metadata request + ProxyConvertGVK // DefaultYurtCoordinatorEtcdSvcName represents default yurt coordinator etcd service DefaultYurtCoordinatorEtcdSvcName = "yurt-coordinator-etcd" // DefaultYurtCoordinatorAPIServerSvcName represents default yurt coordinator apiServer service @@ -162,6 +167,28 @@ func IfPoolScopedResourceFrom(ctx context.Context) (bool, bool) { return info, ok } +// WithIsPartialRequest returns a copy of parent in which the is partial object metadata request value is set +func WithIsPartialRequest(parent context.Context, isPartialReq bool) context.Context { + return WithValue(parent, ProxyPartialObjectMetadataRequest, isPartialReq) +} + +// IsPartialRequestFrom returns the value of the is partial object metadata request key on the ctx +func IsPartialRequestFrom(ctx context.Context) (bool, bool) { + info, ok := ctx.Value(ProxyPartialObjectMetadataRequest).(bool) + return info, ok +} + +// WithConvertGVK returns a copy of parent in which the convert gvk value is set +func WithConvertGVK(parent context.Context, gvk *schema.GroupVersionKind) context.Context { + return WithValue(parent, ProxyConvertGVK, gvk) +} + +// ConvertGVKFrom returns the value of the convert gvk key on the ctx +func ConvertGVKFrom(ctx context.Context) (*schema.GroupVersionKind, bool) { + info, ok := ctx.Value(ProxyConvertGVK).(*schema.GroupVersionKind) + return info, ok +} + // ReqString formats a string for request func ReqString(req *http.Request) string { ctx := req.Context() @@ -182,8 +209,8 @@ func ReqInfoString(info *apirequest.RequestInfo) string { return fmt.Sprintf("%s %s for %s", info.Verb, info.Resource, info.Path) } -// WriteObject write object to response writer -func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req *http.Request) error { +// Err write err to response writer +func Err(err error, w http.ResponseWriter, req *http.Request) { ctx := req.Context() if info, ok := apirequest.RequestInfoFrom(ctx); ok { gv := schema.GroupVersion{ @@ -191,13 +218,63 @@ func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req Version: info.APIVersion, } negotiatedSerializer := serializer.YurtHubSerializer.GetNegotiatedSerializer(gv.WithResource(info.Resource)) - responsewriters.WriteObjectNegotiated(negotiatedSerializer, negotiation.DefaultEndpointRestrictions, gv, w, req, statusCode, obj, false) + responsewriters.ErrorNegotiated(err, negotiatedSerializer, gv, w, req) + return + } + + klog.Errorf("request info is not found when err write, %s", ReqString(req)) +} + +// WriteObject write object to response writer +func WriteObject(statusCode int, obj runtime.Object, w http.ResponseWriter, req *http.Request) error { + ctx := req.Context() + if info, ok := apirequest.RequestInfoFrom(ctx); ok { + gvr := schema.GroupVersionResource{ + Group: info.APIGroup, + Version: info.APIVersion, + Resource: info.Resource, + } + + isPartialReq, _ := IsPartialRequestFrom(ctx) + if isPartialReq { + convertGvk, _ := ConvertGVKFrom(ctx) + if convertGvk != nil { + gvr, _ = meta.UnsafeGuessKindToResource(*convertGvk) + } + } + + negotiatedSerializer := serializer.YurtHubSerializer.GetNegotiatedSerializer(gvr) + responsewriters.WriteObjectNegotiated(negotiatedSerializer, DefaultHubEndpointRestrictions, gvr.GroupVersion(), w, req, statusCode, obj, false) return nil } return fmt.Errorf("request info is not found when write object, %s", ReqString(req)) } +// DefaultHubEndpointRestrictions is the default EndpointRestrictions which allows +// content-type negotiation to verify yurthub server support for specific options +var DefaultHubEndpointRestrictions = hubEndpointRestrictions{} + +type hubEndpointRestrictions struct{} + +func (hubEndpointRestrictions) AllowsMediaTypeTransform(mimeType string, mimeSubType string, gvk *schema.GroupVersionKind) bool { + if gvk == nil { + return true + } + + if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion || gvk.GroupVersion() == metav1.SchemeGroupVersion { + switch gvk.Kind { + case "PartialObjectMetadata", "PartialObjectMetadataList": + return true + default: + return false + } + } + return false +} +func (hubEndpointRestrictions) AllowsServerVersion(string) bool { return false } +func (hubEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" } + func NewTripleReadCloser(req *http.Request, rc io.ReadCloser, isRespBody bool) (io.ReadCloser, io.ReadCloser, io.ReadCloser) { pr1, pw1 := io.Pipe() pr2, pw2 := io.Pipe()