diff --git a/Tiltfile b/Tiltfile index d6c502733b0..2dcfd6f7783 100644 --- a/Tiltfile +++ b/Tiltfile @@ -65,6 +65,7 @@ k8s_resource( 'pod-watcher:Role', 'memberlists.chroma.cluster:CustomResourceDefinition', 'query-service-memberlist:MemberList', + 'compaction-service-memberlist:MemberList', 'sysdb-serviceaccount:serviceaccount', 'sysdb-serviceaccount-rolebinding:RoleBinding', @@ -78,6 +79,10 @@ k8s_resource( 'query-service-query-service-memberlist-binding:clusterrolebinding', 'query-service-memberlist-readerwriter-binding:clusterrolebinding', + 'compaction-service-memberlist-readerwriter:ClusterRole', + 'compaction-service-compaction-service-memberlist-binding:clusterrolebinding', + 'compaction-service-memberlist-readerwriter-binding:clusterrolebinding', + 'test-memberlist:MemberList', 'test-memberlist-reader:ClusterRole', 'test-memberlist-reader-binding:ClusterRoleBinding', diff --git a/go/cmd/coordinator/cmd.go b/go/cmd/coordinator/cmd.go index c105eafc57f..24a1093aad8 100644 --- a/go/cmd/coordinator/cmd.go +++ b/go/cmd/coordinator/cmd.go @@ -54,9 +54,15 @@ func init() { // Memberlist Cmd.Flags().StringVar(&conf.KubernetesNamespace, "kubernetes-namespace", "chroma", "Kubernetes namespace") - Cmd.Flags().StringVar(&conf.WorkerMemberlistName, "worker-memberlist-name", "query-service-memberlist", "Worker memberlist name") - Cmd.Flags().StringVar(&conf.WorkerPodLabel, "worker-pod-label", "query-service", "Worker pod label") + + // Query service memberlist + Cmd.Flags().StringVar(&conf.QueryServiceMemberlistName, "query-memberlist-name", "query-service-memberlist", "Query service memberlist name") + Cmd.Flags().StringVar(&conf.QueryServicePodLabel, "query-pod-label", "query-service", "Query pod label") Cmd.Flags().DurationVar(&conf.WatchInterval, "watch-interval", 60*time.Second, "Watch interval") + + // Compaction service Memberlist + Cmd.Flags().StringVar(&conf.CompactionServiceMemberlistName, "compaction-memberlist-name", "compaction-service-memberlist", "Compaction memberlist name") + Cmd.Flags().StringVar(&conf.CompactionServicePodLabel, "compaction-pod-label", "compaction-service", "Compaction pod label") } func exec(*cobra.Command, []string) { diff --git a/go/pkg/coordinator/grpc/server.go b/go/pkg/coordinator/grpc/server.go index 24738aeb207..96b4f53ce9e 100644 --- a/go/pkg/coordinator/grpc/server.go +++ b/go/pkg/coordinator/grpc/server.go @@ -44,13 +44,19 @@ type Config struct { PulsarNamespace string // Kubernetes config - KubernetesNamespace string - WorkerMemberlistName string - WorkerPodLabel string + KubernetesNamespace string + + // Query service memberlist config + QueryServiceMemberlistName string + QueryServicePodLabel string // Watcher config WatchInterval time.Duration + // Compaction service memberlist config + CompactionServiceMemberlistName string + CompactionServicePodLabel string + // Config for testing Testing bool } @@ -133,13 +139,26 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor s.coordinator = coordinator s.coordinator.Start() if !config.Testing { - memberlist_manager, err := createMemberlistManager(config) + namespace := config.KubernetesNamespace + // Create memberlist manager for query service + queryMemberlistManager, err := createMemberlistManager(namespace, config.QueryServiceMemberlistName, config.QueryServicePodLabel, config.WatchInterval) if err != nil { return nil, err } - // Start the memberlist manager - err = memberlist_manager.Start() + // Create memberlist manager for compaction service + compactionMemberlistManager, err := createMemberlistManager(namespace, config.CompactionServiceMemberlistName, config.CompactionServicePodLabel, config.WatchInterval) + if err != nil { + return nil, err + } + + // Start the memberlist manager for query service + err = queryMemberlistManager.Start() + if err != nil { + return nil, err + } + // Start the memberlist manager for compaction service + err = compactionMemberlistManager.Start() if err != nil { return nil, err } @@ -154,10 +173,8 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor return s, nil } -func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) { - log.Info("Creating memberlist manager") - memberlist_name := config.WorkerMemberlistName - namespace := config.KubernetesNamespace +func createMemberlistManager(namespace string, memberlistName string, podLabel string, watchInterval time.Duration) (*memberlist_manager.MemberlistManager, error) { + log.Info("Creating memberlist manager for {}", zap.String("memberlist", memberlistName)) clientset, err := utils.GetKubernetesInterface() if err != nil { return nil, err @@ -166,8 +183,8 @@ func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManag if err != nil { return nil, err } - nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, config.WorkerPodLabel, config.WatchInterval) - memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlist_name) + nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, podLabel, watchInterval) + memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlistName) memberlist_manager := memberlist_manager.NewMemberlistManager(nodeWatcher, memberlistStore) return memberlist_manager, nil } diff --git a/go/pkg/memberlist_manager/memberlist_manager.go b/go/pkg/memberlist_manager/memberlist_manager.go index 990d97a056b..e1afd4dc70a 100644 --- a/go/pkg/memberlist_manager/memberlist_manager.go +++ b/go/pkg/memberlist_manager/memberlist_manager.go @@ -111,7 +111,7 @@ func (m *MemberlistManager) reconcile(nodeIp string, status Status) error { if !exists && status == Ready { newMemberlist = append(newMemberlist, nodeIp) } - return m.memberlistStore.UpdateMemberlist(context.TODO(), &newMemberlist, resourceVersion) + return m.memberlistStore.UpdateMemberlist(context.Background(), &newMemberlist, resourceVersion) } func (m *MemberlistManager) Stop() error { diff --git a/k8s/distributed-chroma/templates/compaction-service-memberlist-cr.yaml b/k8s/distributed-chroma/templates/compaction-service-memberlist-cr.yaml new file mode 100644 index 00000000000..b68981c05ee --- /dev/null +++ b/k8s/distributed-chroma/templates/compaction-service-memberlist-cr.yaml @@ -0,0 +1,79 @@ +# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use. +# They will be used for the upcoming distributed version of chroma. They are not even ready +# for testing yet. Please do not use them unless you are working on the distributed version of chroma. + +apiVersion: chroma.cluster/v1 +kind: MemberList +metadata: + name: compaction-service-memberlist + namespace: {{ .Values.namespace}} +spec: + members: + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: compaction-service-memberlist-readerwriter +rules: +- apiGroups: + - chroma.cluster + resources: + - memberlists + verbs: + - get + - list + - watch + # TODO: FIX THIS LEAKY PERMISSION + - create + - update + - patch + - delete + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: sysdb-compaction-service-memberlist-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: compaction-service-memberlist-readerwriter +subjects: +- kind: ServiceAccount + name: sysdb-serviceaccount + namespace: {{ .Values.namespace }} + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + # Awkward name, but this lets the compaction-service-serviceaccount read + # the compaction-service-memberlist. + name: compaction-service-compaction-service-memberlist-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: compaction-service-memberlist-readerwriter +subjects: +- kind: ServiceAccount + name: compaction-service-serviceaccount + namespace: {{ .Values.namespace }} + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: compaction-service-memberlist-readerwriter-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: compaction-service-memberlist-readerwriter +subjects: +- kind: ServiceAccount + name: default + namespace: {{ .Values.namespace }}