Skip to content

Commit

Permalink
Get node from local cache instead of kube-apiserver cache
Browse files Browse the repository at this point in the history
datadog:patch
  • Loading branch information
wojtek-t authored and rifelpet committed Jul 29, 2024
1 parent 82e3930 commit bc5230f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 13 deletions.
15 changes: 9 additions & 6 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/nodestatus"
"k8s.io/kubernetes/pkg/kubelet/util"
taintutil "k8s.io/kubernetes/pkg/util/taints"
volutil "k8s.io/kubernetes/pkg/volume/util"
)
Expand Down Expand Up @@ -550,15 +549,19 @@ func (kl *Kubelet) updateNodeStatus(ctx context.Context) error {
func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error {
// In large clusters, GET and PUT operations on Node objects coming
// from here are the majority of load on apiserver and etcd.
// To reduce the load on etcd, we are serving GET operations from
// apiserver cache (the data might be slightly delayed but it doesn't
// To reduce the load on control-plane, we are serving GET operations from
// local lister (the data might be slightly delayed but it doesn't
// seem to cause more conflict - the delays are pretty small).
// If it result in a conflict, all retries are served directly from etcd.
opts := metav1.GetOptions{}
var originalNode *v1.Node
var err error

if tryNumber == 0 {
util.FromApiserverCache(&opts)
originalNode, err = kl.nodeLister.Get(string(kl.nodeName))
} else {
opts := metav1.GetOptions{}
originalNode, err = kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
}
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
Expand Down
45 changes: 38 additions & 7 deletions pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/rand"
Expand Down Expand Up @@ -160,6 +161,27 @@ func (lcm *localCM) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceL
return lcm.capacity
}

type delegatingNodeLister struct {
client clientset.Interface
}

func (l delegatingNodeLister) Get(name string) (*v1.Node, error) {
return l.client.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
}

func (l delegatingNodeLister) List(selector labels.Selector) (ret []*v1.Node, err error) {
opts := metav1.ListOptions{}
if selector != nil {
opts.LabelSelector = selector.String()
}
nodeList, err := l.client.CoreV1().Nodes().List(context.Background(), opts)
if err != nil {
return nil, err
}
nodes := make([]*v1.Node, len(nodeList.Items))
return nodes, nil
}

func TestUpdateNewNodeStatus(t *testing.T) {
cases := []struct {
desc string
Expand Down Expand Up @@ -211,6 +233,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
Expand Down Expand Up @@ -390,6 +413,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
},
}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
Expand Down Expand Up @@ -602,6 +626,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
Expand Down Expand Up @@ -824,6 +849,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {
kubeClient := testKubelet.fakeKubeClient
existingNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}}
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}
machineInfo := &cadvisorapi.MachineInfo{
MachineID: "123",
SystemUUID: "abc",
Expand Down Expand Up @@ -1108,6 +1134,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {

kubeClient := testKubelet.fakeKubeClient
kubeClient.ReactionChain = fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*tc.existingNode}}).ReactionChain
kubelet.nodeLister = delegatingNodeLister{client: kubeClient}

// Execute
assert.NoError(t, kubelet.updateNodeStatus(ctx))
Expand Down Expand Up @@ -1260,11 +1287,15 @@ func TestFastStatusUpdateOnce(t *testing.T) {
return
}

// patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
require.Len(t, actions, 2*tc.wantPatches-1)
// patch, then patch, get, patch, get, patch, ... up to initial patch + nodeStatusUpdateRetry patches
expectedActions := 2*tc.wantPatches - 2
if tc.wantPatches == 1 {
expectedActions = 1
}
require.Len(t, actions, expectedActions)

for i, action := range actions {
if i%2 == 1 {
if i%2 == 0 && i > 0 {
require.IsType(t, core.GetActionImpl{}, action)
continue
}
Expand Down Expand Up @@ -1566,11 +1597,11 @@ func TestUpdateNewNodeStatusTooLargeReservation(t *testing.T) {
kubelet.updateRuntimeUp()
assert.NoError(t, kubelet.updateNodeStatus(ctx))
actions := kubeClient.Actions()
require.Len(t, actions, 2)
require.True(t, actions[1].Matches("patch", "nodes"))
require.Equal(t, actions[1].GetSubresource(), "status")
require.Len(t, actions, 1)
require.True(t, actions[0].Matches("patch", "nodes"))
require.Equal(t, actions[0].GetSubresource(), "status")

updatedNode, err := applyNodeStatusPatch(&existingNode, actions[1].(core.PatchActionImpl).GetPatch())
updatedNode, err := applyNodeStatusPatch(&existingNode, actions[0].(core.PatchActionImpl).GetPatch())
assert.NoError(t, err)
assert.True(t, apiequality.Semantic.DeepEqual(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable), "%s", diff.ObjectDiff(expectedNode.Status.Allocatable, updatedNode.Status.Allocatable))
}
Expand Down

0 comments on commit bc5230f

Please sign in to comment.