Skip to content

Commit

Permalink
[ENH] Add rust assignmenment policy and config management
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Dec 12, 2023
1 parent 00a3b94 commit d90eea8
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 7 deletions.
102 changes: 102 additions & 0 deletions rust/worker/src/assignment/assignment_policy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::config::{Configurable, WorkerConfig};

use super::{
config::{AssignmentPolicyConfig, HasherType},
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
};
use uuid::Uuid;

/*
===========================================
Interfaces
===========================================
*/

/// AssignmentPolicy is a trait that defines how to assign a collection to a topic.
/// # Notes
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a collection to a topic.
/// - get_topics: Get the topics that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError>;
fn get_topics(&self) -> Vec<String>;
}

/*
===========================================
Implementation
===========================================
*/

pub(crate) struct RendezvousHashingAssignmentPolicy {
// The pulsar tenant and namespace being in this implementation of the assignment policy
// is purely a temporary measure while the topic propagation is being worked on.
// TODO: Remove pulsar_tenant and pulsar_namespace from this struct once topic propagation
// is implemented.
pulsar_tenant: String,
pulsar_namespace: String,
hasher: Murmur3Hasher,
}

impl RendezvousHashingAssignmentPolicy {
// Rust beginners note
// The reason we take String and not &str is because we need to put the strings into our
// struct, and we can't do that with references so rather than clone the strings, we just
// take ownership of them and put the responsibility on the caller to clone them if they
// need to. This is the general pattern we should follow in rust - put the burden of cloning
// on the caller, and if they don't need to clone, they can pass ownership.
pub fn new(
pulsar_tenant: String,
pulsar_namespace: String,
) -> RendezvousHashingAssignmentPolicy {
return RendezvousHashingAssignmentPolicy {
pulsar_tenant: pulsar_tenant,
pulsar_namespace: pulsar_namespace,
hasher: Murmur3Hasher {},
};
}
}

impl Configurable for RendezvousHashingAssignmentPolicy {
fn from_config(config: WorkerConfig) -> Self {
let assignment_policy_config = match config.assignment_policy {
AssignmentPolicyConfig::RendezvousHashing(config) => config,
};
let hasher = match assignment_policy_config.hasher {
HasherType::Murmur3 => Murmur3Hasher {},
};
return RendezvousHashingAssignmentPolicy {
pulsar_tenant: config.pulsar_tenant,
pulsar_namespace: config.pulsar_namespace,
hasher: hasher,
};
}
}

impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, collection_id: Uuid) -> Result<String, AssignmentError> {
let collection_id = collection_id.to_string();
let topics = self.get_topics();
let topic = assign(&collection_id, topics, &self.hasher);
return topic;
}

fn get_topics(&self) -> Vec<String> {
// This mirrors the current python and go code, which assumes a fixed set of topics
let mut topics = Vec::with_capacity(16);
for i in 0..16 {
let topic = format!(
"persistent://{}/{}/chroma_log_{}",
self.pulsar_tenant, self.pulsar_namespace, i
);
topics.push(topic);
}
return topics;
}
}
28 changes: 28 additions & 0 deletions rust/worker/src/assignment/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use serde::Deserialize;

#[derive(Deserialize)]
/// The type of hasher to use.
/// # Options
/// - Murmur3: The murmur3 hasher.
pub(crate) enum HasherType {
Murmur3,
}

#[derive(Deserialize)]
/// The configuration for the assignment policy.
/// # Options
/// - RendezvousHashing: The rendezvous hashing assignment policy.
/// # Notes
/// See config.rs in the root of the worker crate for an example of how to use
/// config files to configure the worker.
pub(crate) enum AssignmentPolicyConfig {
RendezvousHashing(RendezvousHashingAssignmentPolicyConfig),
}

#[derive(Deserialize)]
/// The configuration for the rendezvous hashing assignment policy.
/// # Fields
/// - hasher: The type of hasher to use.
pub(crate) struct RendezvousHashingAssignmentPolicyConfig {
pub(crate) hasher: HasherType,
}
2 changes: 2 additions & 0 deletions rust/worker/src/assignment/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
mod assignment_policy;
pub(crate) mod config;
mod rendezvous_hash;
8 changes: 4 additions & 4 deletions rust/worker/src/assignment/rendezvous_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use thiserror::Error;
use murmur3::murmur3_x64_128;

/// A trait for hashing a member and a key to a score.
trait Hasher {
pub(crate) trait Hasher {
fn hash(&self, member: &str, key: &str) -> Result<u64, AssignmentError>;
}

/// Error codes for assignment
#[derive(Error, Debug)]
enum AssignmentError {
pub(crate) enum AssignmentError {
#[error("Cannot assign empty key")]
EmptyKey,
#[error("No members to assign to")]
Expand Down Expand Up @@ -49,7 +49,7 @@ impl ChromaError for AssignmentError {
/// # Notes
/// This implementation mirrors the rendezvous hash implementation
/// in the go and python services.
fn assign<H: Hasher>(
pub(crate) fn assign<H: Hasher>(
key: &str,
members: impl IntoIterator<Item = impl AsRef<str>>,
hasher: &H,
Expand Down Expand Up @@ -97,7 +97,7 @@ fn merge_hashes(x: u64, y: u64) -> u64 {
acc
}

struct Murmur3Hasher {}
pub(crate) struct Murmur3Hasher {}

impl Hasher for Murmur3Hasher {
fn hash(&self, member: &str, key: &str) -> Result<u64, AssignmentError> {
Expand Down
52 changes: 49 additions & 3 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,23 @@ impl RootConfig {
/// # Notes
/// In order to set the enviroment variables, you must prefix them with CHROMA_WORKER__<FIELD_NAME>.
/// For example, to set my_ip, you would set CHROMA_WORKER__MY_IP.
struct WorkerConfig {
my_ip: String,
num_indexing_threads: u32,
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
/// have its own field in this struct for its Config struct.
pub(crate) struct WorkerConfig {
pub(crate) my_ip: String,
pub(crate) num_indexing_threads: u32,
pub(crate) pulsar_tenant: String,
pub(crate) pulsar_namespace: String,
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
}

/// # Description
/// A trait for configuring a struct from a config object.
/// # Notes
/// This trait is used to configure structs from the config object.
/// Components that need to be configured from the config object should implement this trait.
pub(crate) trait Configurable {
fn from_config(config: WorkerConfig) -> Self;
}

#[cfg(test)]
Expand All @@ -106,11 +120,18 @@ mod tests {
worker:
my_ip: "192.0.0.1"
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
"#,
);
let config = RootConfig::load();
assert_eq!(config.worker.my_ip, "192.0.0.1");
assert_eq!(config.worker.num_indexing_threads, 4);
assert_eq!(config.worker.pulsar_tenant, "public");
assert_eq!(config.worker.pulsar_namespace, "default");
Ok(())
});
}
Expand All @@ -124,11 +145,18 @@ mod tests {
worker:
my_ip: "192.0.0.1"
num_indexing_threads: 4
pulsar_tenant: "public"
pulsar_namespace: "default"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
"#,
);
let config = RootConfig::load_from_path("random_path.yaml");
assert_eq!(config.worker.my_ip, "192.0.0.1");
assert_eq!(config.worker.num_indexing_threads, 4);
assert_eq!(config.worker.pulsar_tenant, "public");
assert_eq!(config.worker.pulsar_namespace, "default");
Ok(())
});
}
Expand Down Expand Up @@ -157,6 +185,11 @@ mod tests {
r#"
worker:
my_ip: "192.0.0.1"
pulsar_tenant: "public"
pulsar_namespace: "default"
assignment_policy:
RendezvousHashing:
hasher: Murmur3
"#,
);
let config = RootConfig::load();
Expand All @@ -170,9 +203,22 @@ mod tests {
fn test_config_with_env_override() {
Jail::expect_with(|jail| {
let _ = jail.set_env("CHROMA_WORKER__MY_IP", "192.0.0.1");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A");
let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B");
let _ = jail.create_file(
"chroma_config.yaml",
r#"
worker:
assignment_policy:
RendezvousHashing:
hasher: Murmur3
"#,
);
let config = RootConfig::load();
assert_eq!(config.worker.my_ip, "192.0.0.1");
assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32);
assert_eq!(config.worker.pulsar_tenant, "A");
assert_eq!(config.worker.pulsar_namespace, "B");
Ok(())
});
}
Expand Down

0 comments on commit d90eea8

Please sign in to comment.