Skip to content

Commit

Permalink
[ENH] Add compaction service memberlist propagation (#1946)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - ...
 - New functionality
	 - This PR adds memberlist manager for compaction service.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
Ishiihara authored Mar 29, 2024
1 parent 99f0549 commit 2c416aa
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 15 deletions.
5 changes: 5 additions & 0 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ k8s_resource(
'pod-watcher:Role',
'memberlists.chroma.cluster:CustomResourceDefinition',
'query-service-memberlist:MemberList',
'compaction-service-memberlist:MemberList',

'sysdb-serviceaccount:serviceaccount',
'sysdb-serviceaccount-rolebinding:RoleBinding',
Expand All @@ -78,6 +79,10 @@ k8s_resource(
'query-service-query-service-memberlist-binding:clusterrolebinding',
'query-service-memberlist-readerwriter-binding:clusterrolebinding',

'compaction-service-memberlist-readerwriter:ClusterRole',
'compaction-service-compaction-service-memberlist-binding:clusterrolebinding',
'compaction-service-memberlist-readerwriter-binding:clusterrolebinding',

'test-memberlist:MemberList',
'test-memberlist-reader:ClusterRole',
'test-memberlist-reader-binding:ClusterRoleBinding',
Expand Down
10 changes: 8 additions & 2 deletions go/cmd/coordinator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ func init() {

// Memberlist
Cmd.Flags().StringVar(&conf.KubernetesNamespace, "kubernetes-namespace", "chroma", "Kubernetes namespace")
Cmd.Flags().StringVar(&conf.WorkerMemberlistName, "worker-memberlist-name", "query-service-memberlist", "Worker memberlist name")
Cmd.Flags().StringVar(&conf.WorkerPodLabel, "worker-pod-label", "query-service", "Worker pod label")

// Query service memberlist
Cmd.Flags().StringVar(&conf.QueryServiceMemberlistName, "query-memberlist-name", "query-service-memberlist", "Query service memberlist name")
Cmd.Flags().StringVar(&conf.QueryServicePodLabel, "query-pod-label", "query-service", "Query pod label")
Cmd.Flags().DurationVar(&conf.WatchInterval, "watch-interval", 60*time.Second, "Watch interval")

// Compaction service Memberlist
Cmd.Flags().StringVar(&conf.CompactionServiceMemberlistName, "compaction-memberlist-name", "compaction-service-memberlist", "Compaction memberlist name")
Cmd.Flags().StringVar(&conf.CompactionServicePodLabel, "compaction-pod-label", "compaction-service", "Compaction pod label")
}

func exec(*cobra.Command, []string) {
Expand Down
41 changes: 29 additions & 12 deletions go/pkg/coordinator/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ type Config struct {
PulsarNamespace string

// Kubernetes config
KubernetesNamespace string
WorkerMemberlistName string
WorkerPodLabel string
KubernetesNamespace string

// Query service memberlist config
QueryServiceMemberlistName string
QueryServicePodLabel string

// Watcher config
WatchInterval time.Duration

// Compaction service memberlist config
CompactionServiceMemberlistName string
CompactionServicePodLabel string

// Config for testing
Testing bool
}
Expand Down Expand Up @@ -133,13 +139,26 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor
s.coordinator = coordinator
s.coordinator.Start()
if !config.Testing {
memberlist_manager, err := createMemberlistManager(config)
namespace := config.KubernetesNamespace
// Create memberlist manager for query service
queryMemberlistManager, err := createMemberlistManager(namespace, config.QueryServiceMemberlistName, config.QueryServicePodLabel, config.WatchInterval)
if err != nil {
return nil, err
}

// Start the memberlist manager
err = memberlist_manager.Start()
// Create memberlist manager for compaction service
compactionMemberlistManager, err := createMemberlistManager(namespace, config.CompactionServiceMemberlistName, config.CompactionServicePodLabel, config.WatchInterval)
if err != nil {
return nil, err
}

// Start the memberlist manager for query service
err = queryMemberlistManager.Start()
if err != nil {
return nil, err
}
// Start the memberlist manager for compaction service
err = compactionMemberlistManager.Start()
if err != nil {
return nil, err
}
Expand All @@ -154,10 +173,8 @@ func NewWithGrpcProvider(config Config, provider grpcutils.GrpcProvider, db *gor
return s, nil
}

func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManager, error) {
log.Info("Creating memberlist manager")
memberlist_name := config.WorkerMemberlistName
namespace := config.KubernetesNamespace
func createMemberlistManager(namespace string, memberlistName string, podLabel string, watchInterval time.Duration) (*memberlist_manager.MemberlistManager, error) {
log.Info("Creating memberlist manager for {}", zap.String("memberlist", memberlistName))
clientset, err := utils.GetKubernetesInterface()
if err != nil {
return nil, err
Expand All @@ -166,8 +183,8 @@ func createMemberlistManager(config Config) (*memberlist_manager.MemberlistManag
if err != nil {
return nil, err
}
nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, config.WorkerPodLabel, config.WatchInterval)
memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlist_name)
nodeWatcher := memberlist_manager.NewKubernetesWatcher(clientset, namespace, podLabel, watchInterval)
memberlistStore := memberlist_manager.NewCRMemberlistStore(dynamicClient, namespace, memberlistName)
memberlist_manager := memberlist_manager.NewMemberlistManager(nodeWatcher, memberlistStore)
return memberlist_manager, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/memberlist_manager/memberlist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *MemberlistManager) reconcile(nodeIp string, status Status) error {
if !exists && status == Ready {
newMemberlist = append(newMemberlist, nodeIp)
}
return m.memberlistStore.UpdateMemberlist(context.TODO(), &newMemberlist, resourceVersion)
return m.memberlistStore.UpdateMemberlist(context.Background(), &newMemberlist, resourceVersion)
}

func (m *MemberlistManager) Stop() error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# These kubernetes manifests are UNDER ACTIVE DEVELOPMENT and are not yet ready for production use.
# They will be used for the upcoming distributed version of chroma. They are not even ready
# for testing yet. Please do not use them unless you are working on the distributed version of chroma.

apiVersion: chroma.cluster/v1
kind: MemberList
metadata:
name: compaction-service-memberlist
namespace: {{ .Values.namespace}}
spec:
members:

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: compaction-service-memberlist-readerwriter
rules:
- apiGroups:
- chroma.cluster
resources:
- memberlists
verbs:
- get
- list
- watch
# TODO: FIX THIS LEAKY PERMISSION
- create
- update
- patch
- delete

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sysdb-compaction-service-memberlist-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: compaction-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: sysdb-serviceaccount
namespace: {{ .Values.namespace }}

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
# Awkward name, but this lets the compaction-service-serviceaccount read
# the compaction-service-memberlist.
name: compaction-service-compaction-service-memberlist-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: compaction-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: compaction-service-serviceaccount
namespace: {{ .Values.namespace }}

---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: compaction-service-memberlist-readerwriter-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: compaction-service-memberlist-readerwriter
subjects:
- kind: ServiceAccount
name: default
namespace: {{ .Values.namespace }}

0 comments on commit 2c416aa

Please sign in to comment.