From e2ee34e63f574ca8052f8225be611accada54014 Mon Sep 17 00:00:00 2001 From: Hammad Bashir Date: Thu, 21 Mar 2024 16:51:12 -0700 Subject: [PATCH] [CHORE] Go Memberlist debug (#1913) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Add some debugs for memberlist in go - New functionality - None ## Test plan *How are these changes tested?* - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?* --- go/pkg/coordinator/grpc/server.go | 2 +- go/pkg/memberlist_manager/memberlist_manager.go | 1 + go/pkg/memberlist_manager/memberlist_store.go | 3 +++ go/pkg/memberlist_manager/node_watcher.go | 4 ++++ 4 files changed, 9 insertions(+), 1 deletion(-) diff --git a/go/pkg/coordinator/grpc/server.go b/go/pkg/coordinator/grpc/server.go index 91aa96ef1c6..b531cbc3a44 100644 --- a/go/pkg/coordinator/grpc/server.go +++ b/go/pkg/coordinator/grpc/server.go @@ -174,7 +174,7 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor } func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) { - log.Info("Starting memberlist manager") + log.Info("Creating memberlist manager") memberlist_name := config.WorkerMemberlistName namespace := config.KubernetesNamespace clientset, err := utils.GetKubernetesInterface() diff --git a/go/pkg/memberlist_manager/memberlist_manager.go b/go/pkg/memberlist_manager/memberlist_manager.go index fec3e91d1c5..990d97a056b 100644 --- a/go/pkg/memberlist_manager/memberlist_manager.go +++ b/go/pkg/memberlist_manager/memberlist_manager.go @@ -39,6 +39,7 @@ func NewMemberlistManager(nodeWatcher IWatcher, memberlistStore IMemberlistStore } func (m *MemberlistManager) Start() error { + log.Info("Starting memberlist manager") m.nodeWatcher.RegisterCallback(func(nodeIp string) { m.workqueue.Add(nodeIp) }) diff --git a/go/pkg/memberlist_manager/memberlist_store.go b/go/pkg/memberlist_manager/memberlist_store.go index 0567897f46e..49191359d0c 100644 --- a/go/pkg/memberlist_manager/memberlist_store.go +++ b/go/pkg/memberlist_manager/memberlist_store.go @@ -3,6 +3,8 @@ package memberlist_manager import ( "context" + "github.com/pingcap/log" + "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -53,6 +55,7 @@ func (s *CRMemberlistStore) GetMemberlist(ctx context.Context) (return_memberlis func (s *CRMemberlistStore) UpdateMemberlist(ctx context.Context, memberlist *Memberlist, resourceVersion string) error { gvr := getGvr() + log.Info("Updating memberlist store", zap.Any("memberlist", memberlist)) unstructured := memberlistToCr(memberlist, s.coordinatorNamespace, s.memberlistCustomResource, resourceVersion) _, err := s.dynamicClient.Resource(gvr).Namespace("chroma").Update(context.TODO(), unstructured, metav1.UpdateOptions{}) if err != nil { diff --git a/go/pkg/memberlist_manager/node_watcher.go b/go/pkg/memberlist_manager/node_watcher.go index cac27f5466e..d3d2a04944b 100644 --- a/go/pkg/memberlist_manager/node_watcher.go +++ b/go/pkg/memberlist_manager/node_watcher.go @@ -47,6 +47,7 @@ type KubernetesWatcher struct { } func NewKubernetesWatcher(clientset kubernetes.Interface, coordinator_namespace string, pod_label string, resyncPeriod time.Duration) *KubernetesWatcher { + log.Info("Creating new kubernetes watcher", zap.String("namespace", coordinator_namespace), zap.String("pod label", pod_label), zap.Duration("resync period", resyncPeriod)) labelSelector := labels.SelectorFromSet(map[string]string{MemberLabel: pod_label}) factory := informers.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod, informers.WithNamespace(coordinator_namespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector.String() })) podInformer := factory.Core().V1().Pods().Informer() @@ -75,6 +76,7 @@ func (w *KubernetesWatcher) Start() error { log.Error("Error while asserting object to pod") } if err == nil { + log.Info("Kubernetes Pod Added", zap.String("key", key), zap.String("ip", objPod.Status.PodIP)) ip := objPod.Status.PodIP w.mu.Lock() w.ipToKey[ip] = key @@ -91,6 +93,7 @@ func (w *KubernetesWatcher) Start() error { log.Error("Error while asserting object to pod") } if err == nil { + log.Info("Kubernetes Pod Updated", zap.String("key", key), zap.String("ip", objPod.Status.PodIP)) ip := objPod.Status.PodIP w.ipToKey[ip] = key w.notify(ip) @@ -105,6 +108,7 @@ func (w *KubernetesWatcher) Start() error { log.Error("Error while asserting object to pod") } if err == nil { + log.Info("Kubernetes Pod Deleted", zap.String("ip", objPod.Status.PodIP)) ip := objPod.Status.PodIP // The contract for GetStatus is that if the ip is not in this map, then it returns NotReady delete(w.ipToKey, ip)