Skip to content

Commit

Permalink
[ENH] Worker Topic Assignment (#1376)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Adds worker topic assignment based on rendezvous hashing.
 - New functionality
	 - ...

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*
  • Loading branch information
HammadB authored Nov 17, 2023
1 parent 9191799 commit ec59917
Show file tree
Hide file tree
Showing 19 changed files with 440 additions and 388 deletions.
5 changes: 5 additions & 0 deletions chromadb/ingest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,8 @@ class CollectionAssignmentPolicy(Component):
def assign_collection(self, collection_id: UUID) -> str:
"""Return the topic that should be used for the given collection"""
pass

@abstractmethod
def get_topics(self) -> Sequence[str]:
"""Return the list of topics that this policy is currently using"""
pass
36 changes: 36 additions & 0 deletions chromadb/ingest/impl/simple_policy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Sequence
from uuid import UUID
from overrides import overrides
from chromadb.config import System
Expand All @@ -23,3 +24,38 @@ def _topic(self, collection_id: UUID) -> str:
@overrides
def assign_collection(self, collection_id: UUID) -> str:
return self._topic(collection_id)

@overrides
def get_topics(self) -> Sequence[str]:
raise NotImplementedError(
"SimpleAssignmentPolicy does not support get_topics, each collection has its own topic"
)


class RendezvousHashingAssignmentPolicy(CollectionAssignmentPolicy):
"""The rendezvous hashing assignment policy assigns a collection to a topic based on the
rendezvous hashing algorithm. This is not actually used in the python sysdb. It is only used in the
go sysdb. However, it is useful here in order to provide a way to get the topic list used for the whole system.
"""

_tenant_id: str
_topic_ns: str

def __init__(self, system: System):
self._tenant_id = system.settings.tenant_id
self._topic_ns = system.settings.topic_namespace
super().__init__(system)

@overrides
def assign_collection(self, collection_id: UUID) -> str:
raise NotImplementedError(
"RendezvousHashingAssignmentPolicy is not implemented"
)

@overrides
def get_topics(self) -> Sequence[str]:
# Mirrors go/coordinator/internal/coordinator/assignment_policy.go
return [
f"persistent://{self._tenant_id}/{self._topic_ns}/chroma_log_{i}"
for i in range(16)
]
6 changes: 0 additions & 6 deletions chromadb/proto/chroma_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 0 additions & 128 deletions chromadb/proto/chroma_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,134 +5,6 @@
from chromadb.proto import chroma_pb2 as chromadb_dot_proto_dot_chroma__pb2


class SegmentServerStub(object):
"""Segment Server Interface
TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation
"""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.LoadSegment = channel.unary_unary(
"/chroma.SegmentServer/LoadSegment",
request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString,
response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString,
)
self.ReleaseSegment = channel.unary_unary(
"/chroma.SegmentServer/ReleaseSegment",
request_serializer=chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString,
response_deserializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString,
)


class SegmentServerServicer(object):
"""Segment Server Interface
TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation
"""

def LoadSegment(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

def ReleaseSegment(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")


def add_SegmentServerServicer_to_server(servicer, server):
rpc_method_handlers = {
"LoadSegment": grpc.unary_unary_rpc_method_handler(
servicer.LoadSegment,
request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString,
response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString,
),
"ReleaseSegment": grpc.unary_unary_rpc_method_handler(
servicer.ReleaseSegment,
request_deserializer=chromadb_dot_proto_dot_chroma__pb2.Segment.FromString,
response_serializer=chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
"chroma.SegmentServer", rpc_method_handlers
)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class SegmentServer(object):
"""Segment Server Interface
TODO: figure out subpackaging, ideally this file is colocated with the segment server implementation
"""

@staticmethod
def LoadSegment(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/chroma.SegmentServer/LoadSegment",
chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString,
chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

@staticmethod
def ReleaseSegment(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/chroma.SegmentServer/ReleaseSegment",
chromadb_dot_proto_dot_chroma__pb2.Segment.SerializeToString,
chromadb_dot_proto_dot_chroma__pb2.SegmentServerResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)


class VectorReaderStub(object):
"""Vector Reader Interface"""

Expand Down
4 changes: 2 additions & 2 deletions chromadb/segment/impl/distributed/segment_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# These could go in config but given that they will rarely change, they are here for now to avoid
# polluting the config file further.
WATCH_TIMEOUT_SECONDS = 10
WATCH_TIMEOUT_SECONDS = 60
KUBERNETES_NAMESPACE = "chroma"
KUBERNETES_GROUP = "chroma.cluster"

Expand Down Expand Up @@ -213,7 +213,7 @@ def stop(self) -> None:
@override
def get_segment_endpoint(self, segment: Segment) -> str:
# TODO: This should rendezvous hash the segment ID to a worker given the current memberlist
return "segment-worker.chroma:50051"
return "segment-server.chroma:50051"

@override
def register_updated_segment_callback(
Expand Down
Loading

0 comments on commit ec59917

Please sign in to comment.