From 9f983f12da8953a209dd8c1414f4cc5222869021 Mon Sep 17 00:00:00 2001 From: luomingmeng Date: Mon, 6 May 2024 08:12:28 +0800 Subject: [PATCH] remote reporter plugin and topology adapter support context timeout --- .../fetcher/kubelet/topology/topology_adapter.go | 3 +++ pkg/agent/resourcemanager/fetcher/plugin/endpoint.go | 5 ++++- .../plugin/overcommitmentaware/reporter/reporter.go | 4 ++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go b/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go index fb65f3113..9cab6a5b0 100644 --- a/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go +++ b/pkg/agent/resourcemanager/fetcher/kubelet/topology/topology_adapter.go @@ -53,6 +53,7 @@ import ( const ( podResourcesClientTimeout = 10 * time.Second + getTopologyZonesTimeout = 10 * time.Second podResourcesClientMaxMsgSize = 1024 * 1024 * 16 ) @@ -140,6 +141,8 @@ func (p *topologyAdapterImpl) GetTopologyZones(parentCtx context.Context) ([]*no // always force getting pod list instead of cache ctx := context.WithValue(parentCtx, metaserverpod.BypassCacheKey, metaserverpod.BypassCacheTrue) + ctx, cancel := context.WithTimeout(ctx, getTopologyZonesTimeout) + defer cancel() podList, err := p.metaServer.GetPodList(ctx, nil) if err != nil { return nil, errors.Wrap(err, "get pod list from metaServer failed") diff --git a/pkg/agent/resourcemanager/fetcher/plugin/endpoint.go b/pkg/agent/resourcemanager/fetcher/plugin/endpoint.go index d28254bf1..47ee9d96d 100644 --- a/pkg/agent/resourcemanager/fetcher/plugin/endpoint.go +++ b/pkg/agent/resourcemanager/fetcher/plugin/endpoint.go @@ -33,6 +33,7 @@ import ( const ( dialRemoteEndpointTimeout = 10 * time.Second + getReportContentTimeout = 10 * time.Second ) // ListAndWatchCallback should be called when plugins report info update. @@ -172,7 +173,9 @@ func (e *remoteEndpointImpl) GetReportContent(c context.Context) (*v1alpha1.GetR return nil, fmt.Errorf("endpoint %v has been stopped", e.pluginName) } - resp, err := e.client.GetReportContent(c, &v1alpha1.Empty{}) + ctx, cancel := context.WithTimeout(c, getReportContentTimeout) + defer cancel() + resp, err := e.client.GetReportContent(ctx, &v1alpha1.Empty{}) if err == nil { e.setCache(resp) } diff --git a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go index cad4dd366..5fbade627 100644 --- a/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go +++ b/pkg/agent/sysadvisor/plugin/overcommitmentaware/reporter/reporter.go @@ -155,7 +155,7 @@ func (o *OvercommitRatioReporterPlugin) Stop() error { // GetReportContent get overcommitment ratio from manager directly. // Since the metrics collected by Manager are already an average within a time period, // we expect a faster response to node load fluctuations to avoid excessive overcommit of online resources. -func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) { +func (o *OvercommitRatioReporterPlugin) GetReportContent(ctx context.Context, _ *v1alpha1.Empty) (*v1alpha1.GetReportContentResponse, error) { response := &v1alpha1.GetReportContentResponse{ Content: []*v1alpha1.ReportContent{}, } @@ -175,7 +175,7 @@ func (o *OvercommitRatioReporterPlugin) GetReportContent(_ context.Context, _ *v response.Content = append(response.Content, overcommitRatioContent) // get topologyProvider and guaranteed cpus - topologyProviderContent, err := o.getTopologyProviderReportContent(o.ctx) + topologyProviderContent, err := o.getTopologyProviderReportContent(ctx) if err != nil { return nil, err }