Skip to content

Commit

Permalink
add node metadata cache
Browse files Browse the repository at this point in the history
  • Loading branch information
Zihan Jiang committed Jan 5, 2024
1 parent 440601e commit 1abacb1
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 91 deletions.
19 changes: 11 additions & 8 deletions pkg/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ type Manager struct {
context ManagerContext
deregistrationMu sync.Mutex
sync.Mutex
workQueue []*LifecycleEvent
targets *sync.Map
workQueue []*LifecycleEvent
targets *sync.Map
// nodeMetadataMap stores the node instanceID -> name mapping
nodeMetadataMap map[string]string
metrics *MetricsServer
avarageLatency float64
completedEvents int
Expand Down Expand Up @@ -87,12 +89,13 @@ type WaiterError struct {

func New(auth Authenticator, ctx ManagerContext) *Manager {
return &Manager{
eventStream: make(chan *sqs.Message, 1000),
workQueue: make([]*LifecycleEvent, 0),
metrics: &MetricsServer{},
targets: &sync.Map{},
authenticator: auth,
context: ctx,
eventStream: make(chan *sqs.Message, 100),
workQueue: make([]*LifecycleEvent, 0),
nodeMetadataMap: make(map[string]string),
metrics: &MetricsServer{},
targets: &sync.Map{},
authenticator: auth,
context: ctx,
}
}

Expand Down
23 changes: 4 additions & 19 deletions pkg/service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,10 @@ import (
"k8s.io/client-go/kubernetes"
)

func getNodeByInstance(k kubernetes.Interface, instanceID string) (v1.Node, bool) {
var foundNode v1.Node
nodes, err := k.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Errorf("failed to list nodes: %v", err)
return foundNode, false
}

for _, node := range nodes.Items {
providerID := node.Spec.ProviderID
splitProviderID := strings.Split(providerID, "/")
foundID := splitProviderID[len(splitProviderID)-1]

if instanceID == foundID {
return node, true
}
}

return foundNode, false
func getNodeInstanceID(node v1.Node) string {
providerID := node.Spec.ProviderID
splitProviderID := strings.Split(providerID, "/")
return splitProviderID[len(splitProviderID)-1]
}

func getNodeByName(k kubernetes.Interface, nodeName string) (*v1.Node, bool) {
Expand Down
54 changes: 7 additions & 47 deletions pkg/service/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,56 +51,16 @@ func Test_NodeStatusPredicate(t *testing.T) {

}

func Test_GetNodeByInstancePositive(t *testing.T) {
t.Log("Test_GetNodeByInstancePositive: If a node exists, should be able to get it's instance ID")
kubeClient := fake.NewSimpleClientset()
fakeNodes := []v1.Node{
{
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2a/i-11111111111111111",
},
},
{
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2c/i-22222222222222222",
},
func Test_GetNodeInstanceID(t *testing.T) {
t.Log("Test_GetNodeInstanceID: If a node exists, should be able to get it's instance ID")
fakeNode := v1.Node{
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2a/i-11111111111111111",
},
}
expected := "i-11111111111111111"

for _, node := range fakeNodes {
kubeClient.CoreV1().Nodes().Create(context.Background(), &node, apimachinery_v1.CreateOptions{})
}

_, exists := getNodeByInstance(kubeClient, "i-11111111111111111")
expected := true

if exists != expected {
t.Fatalf("expected getNodeByInstance exists to be: %v, got: %v", expected, exists)
}
}

func Test_GetNodeByInstanceNegative(t *testing.T) {
t.Log("Test_GetNodeByInstanceNegative: If a node exists, should be able to get it's instance ID")
kubeClient := fake.NewSimpleClientset()
fakeNodes := []v1.Node{
{
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2a/i-11111111111111111",
},
},
{
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2c/i-22222222222222222",
},
},
}

for _, node := range fakeNodes {
kubeClient.CoreV1().Nodes().Create(context.Background(), &node, apimachinery_v1.CreateOptions{})
}

_, exists := getNodeByInstance(kubeClient, "i-3333333333333333")
expected := false
exists := getNodeInstanceID(fakeNode)

if exists != expected {
t.Fatalf("expected getNodeByInstance exists to be: %v, got: %v", expected, exists)
Expand Down
48 changes: 31 additions & 17 deletions pkg/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand Down Expand Up @@ -116,7 +117,7 @@ func (mgr *Manager) Start() {

startTime := time.Now()

Check warning on line 118 in pkg/service/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/server.go#L118

Added line #L118 was not covered by tests
event, err := mgr.newEvent(message, queueURL)
log.Debugf("create and validate new event took %v\n", time.Since(startTime).Milliseconds())
log.Debugf("create and validate new event took %v ms", time.Since(startTime).Milliseconds())

Check warning on line 120 in pkg/service/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/server.go#L120

Added line #L120 was not covered by tests
if err != nil {
mgr.RejectEvent(err, event)
continue
Expand All @@ -128,18 +129,14 @@ func (mgr *Manager) Start() {
}

func (mgr *Manager) newEvent(message *sqs.Message, queueURL string) (*LifecycleEvent, error) {
t1 := time.Now()
event, err := readMessage(message, queueURL)
if err != nil {
return &LifecycleEvent{}, err
}
log.Debugf("read message took %v\n", time.Since(t1).Milliseconds())

t2 := time.Now()
if err = mgr.validateEvent(event); err != nil {
return event, err
}
log.Debugf("validate message took %v\n", time.Since(t2).Milliseconds())
return event, nil
}

Expand All @@ -165,24 +162,44 @@ func (mgr *Manager) validateEvent(e *LifecycleEvent) error {
return errors.New("event already exists in queue")
}

t1 := time.Now()
node, exists := getNodeByInstance(kubeClient, e.EC2InstanceID)
log.Debugf("get node by instance took %v\n", time.Since(t1).Milliseconds())

if !exists {
return errors.Errorf("instance %v is not seen in cluster nodes", e.EC2InstanceID)
var node *v1.Node
var err error
nodeName, ok := mgr.nodeMetadataMap[e.EC2InstanceID]
if ok {
// if node name is found in cache, use the k8s Get() method to get the node obj
node, err = kubeClient.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get node %v", e.EC2InstanceID)
}

Check warning on line 173 in pkg/service/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/server.go#L169-L173

Added lines #L169 - L173 were not covered by tests
} else {
// if node name is not found in cache, use the k8s List() method to get all nodes
nodes, err := kubeClient.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
if err != nil {
return errors.Wrapf(err, "failed to list nodes")
}

Check warning on line 179 in pkg/service/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/server.go#L178-L179

Added lines #L178 - L179 were not covered by tests
// update the cache, in the mean time, find the node obj we want
updatedNodeMetadataMap := make(map[string]string)
for _, item := range nodes.Items {
instanceID := getNodeInstanceID(item)
updatedNodeMetadataMap[instanceID] = item.Name
if instanceID == e.EC2InstanceID {
node = item.DeepCopy()
}
}
mgr.nodeMetadataMap = updatedNodeMetadataMap
if node == nil {
return errors.Errorf("node %v not found in the cluster", e.EC2InstanceID)
}

Check warning on line 192 in pkg/service/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/service/server.go#L191-L192

Added lines #L191 - L192 were not covered by tests
}

t2 := time.Now()
heartbeatInterval, err := getHookHeartbeatInterval(auth.ScalingGroupClient, e.LifecycleHookName, e.AutoScalingGroupName)
log.Debugf("get hook heartbeat interval took %v\n", time.Since(t2).Milliseconds())

if err != nil {
return errors.Wrap(err, "failed to get hook heartbeat interval")
}

e.SetHeartbeatInterval(heartbeatInterval)
e.SetReferencedNode(node)
e.SetReferencedNode(*node)

return nil
}
Expand Down Expand Up @@ -239,12 +256,9 @@ func (mgr *Manager) newPoller() {
if len(output.Messages) == 0 {
log.Debugln("no messages received in interval")
}
log.Debugf("received %d messages from queue %s\n", len(output.Messages), url)
startTime := time.Now()
for _, message := range output.Messages {
stream <- message
}
log.Debugf("all messages sent to stream, took %v, current stream length %v\n", time.Since(startTime).Milliseconds(), len(stream))
}
}

Expand Down

0 comments on commit 1abacb1

Please sign in to comment.