Skip to content

Commit

Permalink
feat(model): Update spreadscaler property to use max_concurrent
Browse files Browse the repository at this point in the history
instead of replicas

Signed-off-by: Brooks Townsend <[email protected]>
  • Loading branch information
brooksmtownsend committed Nov 20, 2023
1 parent 79bfee3 commit e22c4d9
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 47 deletions.
22 changes: 19 additions & 3 deletions oam/oam.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,17 @@
"type": "object",
"description": "A properties object (for spreadscaler configuration) is an object whose structure is determined by the spreadscaler property schema. It may be a simple value, or it may be a complex object.",
"properties": {
"replicas": {
"type": "integer"
"max_concurrent": {
"anyOf": [
{
"type": "integer",
"title": "max_concurrent"
},
{
"type": "integer",
"title": "replicas"
}
]
},
"spread": {
"type": "array",
Expand All @@ -287,7 +296,14 @@
}
}
},
"required": ["replicas", "spread"]
"oneOf": [
{
"required": ["max_concurrent", "spread"]
},
{
"required": ["replicas", "spread"]
}
]
},
"propertiesObject": {
"anyOf": [
Expand Down
9 changes: 5 additions & 4 deletions src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,9 @@ pub struct LinkdefProperty {
/// Properties for spread scalers
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct SpreadScalerProperty {
/// Number of replicas to scale
pub replicas: usize,
/// Number of replicas to spread across matching requirements
#[serde(alias = "replicas")]
pub max_concurrent: usize,
/// Requirements for spreading throse replicas
#[serde(default)]
pub spread: Vec<Spread>,
Expand Down Expand Up @@ -508,7 +509,7 @@ mod test {
spread_vec.push(spread_item);
let mut trait_vec: Vec<Trait> = Vec::new();
let spreadscalerprop = SpreadScalerProperty {
replicas: 4,
max_concurrent: 4,
spread: spread_vec,
};
let trait_item = Trait::new_spreadscaler(spreadscalerprop);
Expand Down Expand Up @@ -552,7 +553,7 @@ mod test {
};
spread_vec.push(spread_item);
let spreadscalerprop = SpreadScalerProperty {
replicas: 1,
max_concurrent: 1,
spread: spread_vec,
};
let mut trait_vec: Vec<Trait> = Vec::new();
Expand Down
18 changes: 9 additions & 9 deletions src/scaler/daemonscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorDaemonScaler<S> {
// in a lattice
let spread_config = if spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: spread_config.replicas,
max_concurrent: spread_config.max_concurrent,
spread: vec![Spread::default()],
}
} else {
Expand Down Expand Up @@ -182,15 +182,15 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorDaemonScaler<S> {
.iter()
.filter_map(|(host_id, current_count)| {
// Here we'll generate commands for the proper host depending on where they are running
match current_count.cmp(&self.config.spread_config.replicas) {
match current_count.cmp(&self.config.spread_config.max_concurrent) {
Ordering::Equal => None,
// Scale actor can handle both up and down scaling
Ordering::Less | Ordering::Greater => {
Some(Command::ScaleActor(ScaleActor {
reference: self.config.actor_reference.to_owned(),
actor_id: actor_id.to_owned(),
host_id: host_id.to_string(),
count: self.config.spread_config.replicas,
count: self.config.spread_config.max_concurrent,
model_name: self.config.model_name.to_owned(),
annotations: spreadscaler_annotations(
&spread.name,
Expand Down Expand Up @@ -236,7 +236,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorDaemonScaler<S> {
#[instrument(level = "trace", skip_all, fields(name = %self.config.model_name))]
async fn cleanup(&self) -> Result<Vec<Command>> {
let mut config_clone = self.config.clone();
config_clone.spread_config.replicas = 0;
config_clone.spread_config.max_concurrent = 0;

let cleanerupper = ActorDaemonScaler {
config: config_clone,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<S: ReadStore + Send + Sync> ActorDaemonScaler<S> {
// in a lattice
let spread_config = if spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: spread_config.replicas,
max_concurrent: spread_config.max_concurrent,
spread: vec![Spread::default()],
}
} else {
Expand Down Expand Up @@ -369,7 +369,7 @@ mod test {

// Daemonscalers ignore weight, so it should have no bearing
let complex_spread = SpreadScalerProperty {
replicas: 13,
max_concurrent: 13,
spread: vec![
Spread {
name: "ComplexOne".to_string(),
Expand Down Expand Up @@ -456,7 +456,7 @@ mod test {
let store = Arc::new(TestStore::default());

let echo_spread_property = SpreadScalerProperty {
replicas: 412,
max_concurrent: 412,
spread: vec![
Spread {
name: "RunInFakeCloud".to_string(),
Expand All @@ -480,7 +480,7 @@ mod test {
};

let blobby_spread_property = SpreadScalerProperty {
replicas: 3,
max_concurrent: 3,
spread: vec![
Spread {
name: "CrossRegionCustom".to_string(),
Expand Down Expand Up @@ -788,7 +788,7 @@ mod test {
.await,
);
let blobby_spread_property = SpreadScalerProperty {
replicas: 10,
max_concurrent: 10,
spread: vec![Spread {
name: "HighAvailability".to_string(),
requirements: BTreeMap::from_iter([(
Expand Down
14 changes: 7 additions & 7 deletions src/scaler/daemonscaler/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderDaemonScaler<S> {
// in a lattice
let spread_config = if spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: spread_config.replicas,
max_concurrent: spread_config.max_concurrent,
spread: vec![Spread::default()],
}
} else {
Expand Down Expand Up @@ -134,7 +134,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderDaemonScaler<S> {
public_key: provider_id.to_string(),
annotations: BTreeMap::default(),
});
match (provider_on_host, self.config.spread_config.replicas) {
match (provider_on_host, self.config.spread_config.max_concurrent) {
// Spread replicas set to 0 means we're cleaning up and should stop
// running providers
(Some(_), 0) => Some(Command::StopProvider(StopProvider {
Expand Down Expand Up @@ -192,7 +192,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderDaemonScaler<S> {
#[instrument(level = "trace", skip_all, fields(name = %self.config.model_name))]
async fn cleanup(&self) -> Result<Vec<Command>> {
let mut config_clone = self.config.clone();
config_clone.spread_config.replicas = 0;
config_clone.spread_config.max_concurrent = 0;

let cleanerupper = ProviderDaemonScaler {
config: config_clone,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<S: ReadStore + Send + Sync> ProviderDaemonScaler<S> {
// in a lattice
let spread_config = if config.spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: config.spread_config.replicas,
max_concurrent: config.spread_config.max_concurrent,
spread: vec![Spread::default()],
}
} else {
Expand Down Expand Up @@ -316,7 +316,7 @@ mod test {
provider_contract_id: "contract".to_string(),
model_name: MODEL_NAME.to_string(),
spread_config: SpreadScalerProperty {
replicas: 1,
max_concurrent: 1,
spread: vec![],
},
provider_config: None,
Expand All @@ -339,7 +339,7 @@ mod test {
provider_contract_id: "contract".to_string(),
model_name: MODEL_NAME.to_string(),
spread_config: SpreadScalerProperty {
replicas: 1,
max_concurrent: 1,
spread: vec![],
},
provider_config: Some(CapabilityConfig::Opaque("foobar".to_string())),
Expand Down Expand Up @@ -444,7 +444,7 @@ mod test {
// Ensure we spread evenly with equal weights, clean division
let multi_spread_even = SpreadScalerProperty {
// Replicas are ignored so putting an absurd number
replicas: 12312,
max_concurrent: 12312,
spread: vec![Spread {
name: "SimpleOne".to_string(),
requirements: BTreeMap::from_iter([("inda".to_string(), "cloud".to_string())]),
Expand Down
2 changes: 1 addition & 1 deletion src/scaler/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ where
lattice_id: lattice_id.to_owned(),
provider_reference: props.image.to_owned(),
spread_config: SpreadScalerProperty {
replicas: 1,
max_concurrent: 1,
spread: vec![],
},
provider_contract_id: props.contract.to_owned(),
Expand Down
30 changes: 15 additions & 15 deletions src/scaler/spreadscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorSpreadScaler<S> {
#[instrument(level = "trace", skip_all, fields(name = %self.config.model_name))]
async fn cleanup(&self) -> Result<Vec<Command>> {
let mut config_clone = self.config.clone();
config_clone.spread_config.replicas = 0;
config_clone.spread_config.max_concurrent = 0;
let spread_requirements = compute_spread(&config_clone.spread_config);

let cleanerupper = ActorSpreadScaler {
Expand Down Expand Up @@ -347,7 +347,7 @@ pub(crate) fn eligible_hosts<'a>(
/// Given a spread config, return a vector of tuples that represents the spread
/// and the actual number of actors to start for a specific spread requirement
fn compute_spread(spread_config: &SpreadScalerProperty) -> Vec<(Spread, usize)> {
let requested_replicas = spread_config.replicas;
let requested_replicas = spread_config.max_concurrent;
let mut requested_spreads = spread_config.spread.clone();
requested_spreads.sort_by_key(|s| Reverse(s.weight.unwrap_or(DEFAULT_SPREAD_WEIGHT)));

Expand Down Expand Up @@ -456,7 +456,7 @@ mod test {
fn can_spread_properly() -> Result<()> {
// Basic test to ensure our types are correct
let simple_spread = SpreadScalerProperty {
replicas: 1,
max_concurrent: 1,
spread: vec![Spread {
name: "Simple".to_string(),
requirements: BTreeMap::new(),
Expand All @@ -469,7 +469,7 @@ mod test {

// Ensure we spread evenly with equal weights, clean division
let multi_spread_even = SpreadScalerProperty {
replicas: 10,
max_concurrent: 10,
spread: vec![
Spread {
name: "SimpleOne".to_string(),
Expand All @@ -490,7 +490,7 @@ mod test {

// Ensure we spread an odd number with clean dividing weights
let multi_spread_odd = SpreadScalerProperty {
replicas: 7,
max_concurrent: 7,
spread: vec![
Spread {
name: "SimpleOne".to_string(),
Expand All @@ -511,7 +511,7 @@ mod test {

// Ensure we spread an odd number with unclean dividing weights
let multi_spread_odd = SpreadScalerProperty {
replicas: 7,
max_concurrent: 7,
spread: vec![
Spread {
name: "SimpleOne".to_string(),
Expand All @@ -532,7 +532,7 @@ mod test {

// Ensure we compute if a weights aren't specified
let multi_spread_even_no_weight = SpreadScalerProperty {
replicas: 10,
max_concurrent: 10,
spread: vec![
Spread {
name: "SimpleOne".to_string(),
Expand All @@ -553,7 +553,7 @@ mod test {

// Ensure we compute if spread vec is empty
let simple_spread_replica_only = SpreadScalerProperty {
replicas: 12,
max_concurrent: 12,
spread: vec![],
};

Expand All @@ -563,7 +563,7 @@ mod test {

// Ensure we handle an all around complex case
let complex_spread = SpreadScalerProperty {
replicas: 103,
max_concurrent: 103,
spread: vec![
Spread {
// 9 + 1 (remainder trip)
Expand Down Expand Up @@ -629,7 +629,7 @@ mod test {

// Ensure we compute if a weights aren't specified
let complex_spread = SpreadScalerProperty {
replicas: 103,
max_concurrent: 103,
spread: vec![
Spread {
// 9 + 1 (remainder trip)
Expand Down Expand Up @@ -712,7 +712,7 @@ mod test {
let store = Arc::new(TestStore::default());

let echo_spread_property = SpreadScalerProperty {
replicas: 412,
max_concurrent: 412,
spread: vec![
Spread {
name: "RunInFakeCloud".to_string(),
Expand All @@ -736,7 +736,7 @@ mod test {
};

let blobby_spread_property = SpreadScalerProperty {
replicas: 9,
max_concurrent: 9,
spread: vec![
Spread {
name: "CrossRegionCustom".to_string(),
Expand Down Expand Up @@ -1009,7 +1009,7 @@ mod test {

// Run 75% in east, 25% on resilient hosts
let real_spread = SpreadScalerProperty {
replicas: 20,
max_concurrent: 20,
spread: vec![
Spread {
name: "SimpleOne".to_string(),
Expand Down Expand Up @@ -1117,7 +1117,7 @@ mod test {

let real_spread = SpreadScalerProperty {
// Makes it so we always get at least 2 commands
replicas: 9,
max_concurrent: 9,
spread: Vec::new(),
};

Expand Down Expand Up @@ -1277,7 +1277,7 @@ mod test {
.await,
);
let blobby_spread_property = SpreadScalerProperty {
replicas: 9,
max_concurrent: 9,
spread: vec![
Spread {
name: "CrossRegionCustom".to_string(),
Expand Down
Loading

0 comments on commit e22c4d9

Please sign in to comment.