From 3e5dc67476cc0d706e45a066d591b7a6138b1f42 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 7 Jan 2025 11:52:53 +0000 Subject: [PATCH 1/2] [Bifrost] Let providers propose their own parameters - BifrostAdmin can now init empty logs configuration - BifrostAdmin can now auto extend the chain - Providers now have control over suggesting a new segment configuration - Introduces a temporary copy of NodeSetSelector into bifrost until log-controller is removed --- Cargo.lock | 1 + .../src/cluster_controller/logs_controller.rs | 58 +- .../logs_controller/nodeset_selection.rs | 2 +- .../admin/src/cluster_controller/service.rs | 26 +- crates/bifrost/Cargo.toml | 1 + crates/bifrost/src/appender.rs | 6 +- crates/bifrost/src/bifrost_admin.rs | 172 +++++- crates/bifrost/src/error.rs | 17 + crates/bifrost/src/loglet/provider.rs | 17 +- .../src/providers/local_loglet/provider.rs | 20 +- crates/bifrost/src/providers/memory_loglet.rs | 20 +- .../src/providers/replicated_loglet/mod.rs | 1 + .../replicated_loglet/nodeset_selector.rs | 515 ++++++++++++++++++ .../providers/replicated_loglet/provider.rs | 79 ++- .../replicated_loglet/tasks/check_seal.rs | 1 + crates/core/src/metadata/manager.rs | 10 +- crates/types/src/config/common.rs | 2 +- crates/types/src/logs/metadata.rs | 53 +- .../restatectl/src/commands/cluster/config.rs | 12 +- .../src/commands/cluster/config/set.rs | 8 +- .../restatectl/src/commands/log/list_logs.rs | 5 + 21 files changed, 913 insertions(+), 113 deletions(-) create mode 100644 crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs diff --git a/Cargo.lock b/Cargo.lock index 436138462..128283d19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6080,6 +6080,7 @@ dependencies = [ "enumset", "futures", "googletest", + "itertools 0.13.0", "metrics", "parking_lot", "paste", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 0a74a7078..4f1f7db6e 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -10,14 +10,16 @@ mod nodeset_selection; -use futures::never::Never; -use rand::prelude::IteratorRandom; -use rand::thread_rng; use std::collections::HashMap; use std::iter; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use futures::never::Never; +use rand::prelude::IteratorRandom; +use rand::thread_rng; +use restate_types::config::Configuration; use tokio::sync::Semaphore; use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; @@ -28,14 +30,13 @@ use restate_core::metadata_store::{ retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, }; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; -use restate_types::config::Configuration; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; use restate_types::live::Pinned; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{ - Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration, - NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex, + Chain, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy, + ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, SegmentIndex, }; use restate_types::logs::{LogId, LogletId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; @@ -320,17 +321,17 @@ fn try_provisioning( node_set_selector_hints: impl NodeSetSelectorHints, ) -> Option { match logs_configuration.default_provider { - DefaultProvider::Local => { + ProviderConfiguration::Local => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Local(log_id.into())) } #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { let log_id = LogletId::new(log_id, SegmentIndex::OLDEST); Some(LogletConfiguration::Memory(log_id.into())) } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration( + ProviderConfiguration::Replicated(ref config) => build_new_replicated_loglet_configuration( config, LogletId::new(log_id, SegmentIndex::OLDEST), &Metadata::with_current(|m| m.nodes_config_ref()), @@ -436,10 +437,10 @@ impl LogletConfiguration { ) -> bool { match (self, &logs_configuration.default_provider) { #[cfg(any(test, feature = "memory-loglet"))] - (Self::Memory(_), DefaultProvider::InMemory) => false, - (Self::Local(_), DefaultProvider::Local) => false, + (Self::Memory(_), ProviderConfiguration::InMemory) => false, + (Self::Local(_), ProviderConfiguration::Local) => false, #[cfg(feature = "replicated-loglet")] - (Self::Replicated(params), DefaultProvider::Replicated(config)) => { + (Self::Replicated(params), ProviderConfiguration::Replicated(config)) => { let sequencer_change_required = !observed_cluster_state .is_node_alive(params.sequencer) && !observed_cluster_state.alive_nodes.is_empty(); @@ -468,9 +469,10 @@ impl LogletConfiguration { sequencer_change_required || nodeset_improvement_possible } - _ => { + (x, y) => { debug!( - "Changing provider type is not supporter at the moment. Ignoring reconfigure" + "Changing provider type from {} to {} is not supporter at the moment. Ignoring reconfigure", + x.as_provider(), y.kind(), ); false } @@ -501,10 +503,14 @@ impl LogletConfiguration { match logs_configuration.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => Some(LogletConfiguration::Memory(loglet_id.next().into())), - DefaultProvider::Local => Some(LogletConfiguration::Local(loglet_id.next().into())), + ProviderConfiguration::InMemory => { + Some(LogletConfiguration::Memory(loglet_id.next().into())) + } + ProviderConfiguration::Local => { + Some(LogletConfiguration::Local(loglet_id.next().into())) + } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(ref config) => { + ProviderConfiguration::Replicated(ref config) => { let previous_params = match self { Self::Replicated(previous_params) => Some(previous_params), _ => None, @@ -639,9 +645,9 @@ struct LogsControllerInner { } impl LogsControllerInner { - fn new(configuration: LogsConfiguration, retry_policy: RetryPolicy) -> Self { + fn new(current_logs: Arc, retry_policy: RetryPolicy) -> Self { Self { - current_logs: Arc::new(Logs::with_logs_configuration(configuration)), + current_logs, logs_state: HashMap::with_hasher(Xxh3Builder::default()), logs_write_in_progress: None, retry_policy, @@ -942,7 +948,6 @@ impl LogsController { ) .await?; - let logs_configuration = logs.configuration().clone(); metadata_writer.update(Arc::new(logs)).await?; //todo(azmy): make configurable @@ -955,7 +960,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(logs_configuration, retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, @@ -1279,7 +1287,7 @@ pub mod tests { use enumset::{enum_set, EnumSet}; use restate_types::logs::metadata::{ - DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig, + LogsConfiguration, NodeSetSelectionStrategy, ProviderConfiguration, ReplicatedLogletConfig, }; use restate_types::logs::LogletId; use restate_types::nodes_config::{ @@ -1452,7 +1460,7 @@ pub mod tests { fn logs_configuration(replication_factor: u8) -> LogsConfiguration { LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { + default_provider: ProviderConfiguration::Replicated(ReplicatedLogletConfig { replication_property: ReplicationProperty::new( NonZeroU8::new(replication_factor).expect("must be non zero"), ), @@ -1537,7 +1545,7 @@ pub mod tests { &nodes.observed_state )); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() @@ -1571,7 +1579,7 @@ pub mod tests { let logs_config = logs_configuration(2); - let DefaultProvider::Replicated(ref replicated_loglet_config) = + let ProviderConfiguration::Replicated(ref replicated_loglet_config) = logs_config.default_provider else { unreachable!() diff --git a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs index 8d5a89e22..ebd05a0d6 100644 --- a/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs +++ b/crates/admin/src/cluster_controller/logs_controller/nodeset_selection.rs @@ -13,9 +13,9 @@ use std::cmp::{max, Ordering}; use itertools::Itertools; use rand::prelude::IteratorRandom; use rand::Rng; -use restate_types::logs::metadata::NodeSetSelectionStrategy; use tracing::trace; +use restate_types::logs::metadata::NodeSetSelectionStrategy; use restate_types::nodes_config::NodesConfiguration; use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index e5694601f..3acbc22f5 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -26,7 +26,7 @@ use tracing::{debug, info}; use restate_metadata_store::ReadModifyWriteError; use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ - DefaultProvider, LogletParams, Logs, LogsConfiguration, ProviderKind, SegmentIndex, + LogletParams, Logs, LogsConfiguration, ProviderConfiguration, ProviderKind, SegmentIndex, }; use restate_types::metadata_store::keys::{ BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY, SCHEDULING_PLAN_KEY, @@ -183,7 +183,7 @@ enum ClusterControllerCommand { UpdateClusterConfiguration { num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, response_tx: oneshot::Sender>, }, SealAndExtendChain { @@ -249,7 +249,7 @@ impl ClusterControllerHandle { &self, num_partitions: NonZeroU16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> Result, ShutdownError> { let (response_tx, response_rx) = oneshot::channel(); @@ -439,7 +439,7 @@ impl Service { &self, num_partitions: u16, replication_strategy: ReplicationStrategy, - default_provider: DefaultProvider, + default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self .metadata_store_client @@ -457,8 +457,7 @@ impl Service { // we can only change the default provider if logs.version() != Version::INVALID - && logs.configuration().default_provider.as_provider_kind() - != default_provider.as_provider_kind() + && logs.configuration().default_provider.kind() != default_provider.kind() { { return Err( @@ -786,16 +785,16 @@ impl SealAndExtendTask { let (provider, params) = match &logs.configuration().default_provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => ( + ProviderConfiguration::InMemory => ( ProviderKind::InMemory, u64::from(loglet_id.next()).to_string().into(), ), - DefaultProvider::Local => ( + ProviderConfiguration::Local => ( ProviderKind::Local, u64::from(loglet_id.next()).to_string().into(), ), #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { let schedule_plan = self .metadata_store_client .get::(SCHEDULING_PLAN_KEY.clone()) @@ -833,6 +832,7 @@ mod tests { use googletest::assert_that; use googletest::matchers::eq; + use restate_types::logs::metadata::ProviderKind; use test_log::test; use restate_bifrost::providers::memory_loglet; @@ -843,7 +843,7 @@ mod tests { use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; use restate_types::cluster::cluster_state::PartitionProcessorStatus; - use restate_types::config::{AdminOptions, Configuration}; + use restate_types::config::{AdminOptions, BifrostOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; @@ -1086,8 +1086,11 @@ mod tests { admin_options.log_trim_threshold = 0; let interval_duration = Duration::from_secs(10); admin_options.log_trim_interval = Some(interval_duration.into()); + let mut bifrost_options = BifrostOptions::default(); + bifrost_options.default_provider = ProviderKind::InMemory; let config = Configuration { admin: admin_options, + bifrost: bifrost_options, ..Default::default() }; @@ -1136,6 +1139,7 @@ mod tests { where F: FnMut(TestCoreEnvBuilder) -> TestCoreEnvBuilder, { + restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); @@ -1143,7 +1147,7 @@ mod tests { let mut server_builder = NetworkServerBuilder::default(); let svc = Service::new( - Live::from_value(config), + Configuration::updateable(), HealthStatus::default(), bifrost.clone(), builder.networking.clone(), diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index 4b3df3785..e0b9d1348 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -31,6 +31,7 @@ derive_more = { workspace = true } enum-map = { workspace = true, features = ["serde"] } futures = { workspace = true } googletest = { workspace = true, features = ["anyhow"], optional = true } +itertools = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index 81d8a26ba..0d4ec6d36 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -116,7 +116,7 @@ impl Appender { info!( attempt = attempt, segment_index = %loglet.segment_index(), - "Append batch will be retried (loglet being sealed), waiting for tail to be determined" + "Append batch will be retried (loglet is being sealed), waiting for tail to be determined" ); let new_loglet = Self::wait_next_unsealed_loglet( self.log_id, @@ -131,7 +131,7 @@ impl Appender { Err(AppendError::Other(err)) if err.retryable() => { if let Some(retry_dur) = retry_iter.next() { info!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch. Since underlying error is retryable, will retry in {:?}", @@ -140,7 +140,7 @@ impl Appender { tokio::time::sleep(retry_dur).await; } else { warn!( - ?err, + %err, attempt = attempt, segment_index = %loglet.segment_index(), "Failed to append this batch and exhausted all attempts to retry", diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index bb3a6f50f..6e84d52fb 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -11,13 +11,13 @@ use std::ops::Deref; use std::sync::Arc; +use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; use restate_core::{Metadata, MetadataKind, MetadataWriter}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; -use restate_types::logs::builder::BuilderError; -use restate_types::logs::metadata::{LogletParams, Logs, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Version; @@ -76,6 +76,61 @@ impl<'a> BifrostAdmin<'a> { self.bifrost.inner.trim(log_id, trim_point).await } + /// Seals a loglet under a set of conditions. + /// + /// The loglet will be sealed if and only if the following is true: + /// - if segment_index is set, the tail loglet must match segment_index. + /// If the intention is to create the log, then `segment_index` must be set to `None`. + /// + /// This will continue to retry sealing for seal retryable errors automatically. + #[instrument(level = "debug", skip(self), err)] + pub async fn seal_and_auto_extend_chain( + &self, + log_id: LogId, + segment_index: Option, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let logs = Metadata::with_current(|m| m.logs_snapshot()); + let provider_config = &logs.configuration().default_provider; + let provider = self.bifrost.inner.provider_for(provider_config.kind())?; + // if this is a new log, we don't need to seal and we can immediately write to metadata + // store, otherwise, we need to seal first. + if logs.chain(&log_id).is_none() && segment_index.is_none() { + let proposed_params = + provider.propose_new_loglet_params(log_id, None, provider_config)?; + self.add_log(log_id, provider_config.kind(), proposed_params) + .await?; + return Ok(()); + } + + let segment_index = segment_index + .or_else(|| logs.chain(&log_id).map(|c| c.tail_index())) + .ok_or(Error::UnknownLogId(log_id))?; + + let sealed_segment = loop { + let sealed_segment = self.seal(log_id, segment_index).await?; + if sealed_segment.tail.is_sealed() { + break sealed_segment; + } + debug!(%log_id, %segment_index, "Segment is not sealed yet"); + tokio::time::sleep(Configuration::pinned().bifrost.seal_retry_interval.into()).await; + }; + + let proposed_params = + provider.propose_new_loglet_params(log_id, logs.chain(&log_id), provider_config)?; + + self.add_segment_with_params( + log_id, + segment_index, + sealed_segment.tail.offset(), + provider_config.kind(), + proposed_params, + ) + .await?; + + Ok(()) + } + /// Seals a loglet under a set of conditions. /// /// The loglet will be sealed if and only if the following is true: @@ -187,34 +242,93 @@ impl<'a> BifrostAdmin<'a> { params: LogletParams, ) -> Result<()> { self.bifrost.inner.fail_if_shutting_down()?; - let logs = self - .metadata_store_client - .read_modify_write(BIFROST_CONFIG_KEY.clone(), move |logs: Option| { - let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - - let mut builder = logs.into_builder(); - let mut chain_builder = builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; - - if chain_builder.tail().index() != last_segment_index { - // tail is not what we expected. - return Err(Error::from(AdminError::SegmentMismatch { - expected: last_segment_index, - found: chain_builder.tail().index(), - })); - } + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client.read_modify_write( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + let logs = logs.ok_or(Error::UnknownLogId(log_id))?; - match chain_builder.append_segment(base_lsn, provider, params.clone()) { - Err(e) => match e { - BuilderError::SegmentConflict(lsn) => { - Err(Error::from(AdminError::SegmentConflict(lsn))) - } - _ => unreachable!("the log must exist at this point"), - }, - Ok(_) => Ok(builder.build()), - } - }) - .await - .map_err(|e| e.transpose())?; + let mut builder = logs.into_builder(); + let mut chain_builder = + builder.chain(log_id).ok_or(Error::UnknownLogId(log_id))?; + + if chain_builder.tail().index() != last_segment_index { + // tail is not what we expected. + return Err(Error::from(AdminError::SegmentMismatch { + expected: last_segment_index, + found: chain_builder.tail().index(), + })); + } + + let _ = chain_builder + .append_segment(base_lsn, provider, params.clone()) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) + }) + .await + .map_err(|e| e.transpose())?; + + self.metadata_writer.update(Arc::new(logs)).await?; + Ok(()) + } + + /// Adds a new log if it doesn't exist. + #[instrument(level = "debug", skip(self), err)] + async fn add_log( + &self, + log_id: LogId, + provider: ProviderKind, + params: LogletParams, + ) -> Result<()> { + self.bifrost.inner.fail_if_shutting_down()?; + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client.read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::LogsMetadataNotProvisioned)?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) + }) + .await + .map_err(|e| e.transpose())?; + + self.metadata_writer.update(Arc::new(logs)).await?; + Ok(()) + } + + /// Creates empty metadata if none exists for bifrost and publishes it to metadata + /// manager. + pub async fn init_metadata(&self) -> Result<(), Error> { + let retry_policy = Configuration::pinned() + .common + .network_error_retry_policy + .clone(); + + let logs = retry_on_network_error(retry_policy, || { + self.metadata_store_client + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(&Configuration::pinned()) + }) + }) + .await?; self.metadata_writer.update(Arc::new(logs)).await?; Ok(()) diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index 7ee8b6961..78520c6a8 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use restate_core::{ShutdownError, SyncError}; use restate_types::errors::MaybeRetryableError; +use restate_types::logs::builder::BuilderError; use restate_types::logs::metadata::SegmentIndex; use restate_types::logs::{LogId, Lsn}; @@ -22,6 +23,8 @@ pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("metadata store doesn't have an entry for log metadata")] + LogsMetadataNotProvisioned, #[error("log '{0}' is sealed")] LogSealed(LogId), #[error("unknown log '{0}'")] @@ -53,6 +56,8 @@ pub enum EnqueueError { #[derive(Debug, thiserror::Error)] pub enum AdminError { + #[error("log {0} already exists")] + LogAlreadyExists(LogId), #[error("segment conflicts with existing segment with base_lsn={0}")] SegmentConflict(Lsn), #[error("segment index found in metadata does not match expected {expected}!={found}")] @@ -60,6 +65,8 @@ pub enum AdminError { expected: SegmentIndex, found: SegmentIndex, }, + #[error("loglet params could not be deserialized: {0}")] + ParamsSerde(#[from] serde_json::Error), } impl From for Error { @@ -70,3 +77,13 @@ impl From for Error { } } } + +impl From for AdminError { + fn from(value: BuilderError) -> Self { + match value { + BuilderError::LogAlreadyExists(log_id) => AdminError::LogAlreadyExists(log_id), + BuilderError::ParamsSerde(error) => AdminError::ParamsSerde(error), + BuilderError::SegmentConflict(lsn) => AdminError::SegmentConflict(lsn), + } + } +} diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index e0b397f27..dbf4cf806 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -12,7 +12,9 @@ use std::sync::Arc; use async_trait::async_trait; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::LogId; use super::{Loglet, OperationError}; @@ -37,6 +39,19 @@ pub trait LogletProvider: Send + Sync { params: &LogletParams, ) -> Result>; + /// Returns a proposed `LogletParams` given this log_id, chain, and defaults. + /// + /// This will not perform any updates, it just statically generates a valid + /// configuration for a potentially new loglet. + /// + /// if `chain` is None, this means we no chain exists already for this log. + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result; + /// A hook that's called after provider is started. async fn post_start(&self) {} diff --git a/crates/bifrost/src/providers/local_loglet/provider.rs b/crates/bifrost/src/providers/local_loglet/provider.rs index 2d19cd4f0..02d372138 100644 --- a/crates/bifrost/src/providers/local_loglet/provider.rs +++ b/crates/bifrost/src/providers/local_loglet/provider.rs @@ -17,8 +17,10 @@ use tracing::debug; use restate_types::config::{LocalLogletOptions, RocksDbOptions}; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId}; use super::log_store::RocksDbLogStore; use super::log_store_writer::RocksDbLogWriterHandle; @@ -105,6 +107,20 @@ impl LogletProvider for LocalLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index().next()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/memory_loglet.rs b/crates/bifrost/src/providers/memory_loglet.rs index a1bb9ae0f..dd9bbf6ae 100644 --- a/crates/bifrost/src/providers/memory_loglet.rs +++ b/crates/bifrost/src/providers/memory_loglet.rs @@ -22,9 +22,11 @@ use tokio::sync::Mutex as AsyncMutex; use tracing::{debug, info}; use restate_core::ShutdownError; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; use restate_types::logs::{ - KeyFilter, LogId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, + KeyFilter, LogId, LogletId, LogletOffset, MatchKeyQuery, Record, SequenceNumber, TailState, }; use crate::loglet::util::TailOffsetWatch; @@ -100,6 +102,20 @@ impl LogletProvider for MemoryLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + _defaults: &ProviderConfiguration, + ) -> Result { + let new_segment_index = chain + .map(|c| c.tail_index().next()) + .unwrap_or(SegmentIndex::OLDEST); + Ok(LogletParams::from( + LogletId::new(log_id, new_segment_index).to_string(), + )) + } + async fn shutdown(&self) -> Result<(), OperationError> { info!("Shutting down in-memory loglet provider"); Ok(()) diff --git a/crates/bifrost/src/providers/replicated_loglet/mod.rs b/crates/bifrost/src/providers/replicated_loglet/mod.rs index dc7b92f0b..4d3f89bc0 100644 --- a/crates/bifrost/src/providers/replicated_loglet/mod.rs +++ b/crates/bifrost/src/providers/replicated_loglet/mod.rs @@ -13,6 +13,7 @@ mod log_server_manager; mod loglet; pub(crate) mod metric_definitions; mod network; +mod nodeset_selector; mod provider; mod read_path; mod remote_sequencer; diff --git a/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs new file mode 100644 index 000000000..4b48b541e --- /dev/null +++ b/crates/bifrost/src/providers/replicated_loglet/nodeset_selector.rs @@ -0,0 +1,515 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::cmp::{max, Ordering}; + +use itertools::Itertools; +use rand::prelude::IteratorRandom; +use rand::Rng; +use tracing::trace; + +use restate_types::logs::metadata::NodeSetSelectionStrategy; +use restate_types::nodes_config::NodesConfiguration; +use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; +use restate_types::NodeId; + +/// TEMPORARY UNTIL NODE-LEVEL CLUSTER STATE IS ACTUALLY IMPLEMENTED +#[derive(Clone, Debug)] +pub struct ObservedClusterState; + +impl ObservedClusterState { + pub fn is_node_alive(&self, _node_id: impl Into) -> bool { + // assume all nodes are alive + true + } +} + +/// Nodeset selector for picking a set of storage nodes for a replicated loglet out of a broader +/// pool of available nodes. +/// +/// This selector can be reused once constructed to make multiple decisions in a single scheduling +/// iteration, if the node configuration and the replication settings are not changing. +#[derive(Clone)] +pub struct NodeSetSelector<'a> { + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, +} + +impl<'a> NodeSetSelector<'a> { + pub fn new( + nodes_config: &'a NodesConfiguration, + cluster_state: &'a ObservedClusterState, + ) -> NodeSetSelector<'a> { + Self { + nodes_config, + cluster_state, + } + } + + /// Determines if a nodeset can be improved by adding or replacing members. Does NOT consider + /// sealability of the current configuration when making decisions! + #[allow(unused)] + pub fn can_improve( + &self, + nodeset: &NodeSet, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + ) -> bool { + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + let current_alive = alive_nodeset.intersect(nodeset); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if current_alive.len() == nodeset_size.target_size { + return false; + } + + // todo: we should check the current segment for sealability, otherwise we might propose + // reconfiguration when we are virtually certain to get stuck! + alive_nodeset.len() >= nodeset_size.minimum_size + && alive_nodeset.len() > current_alive.len() + } + + /// Picks a set of storage nodes for a replicated loglet out of the available pool. Only alive, + /// writable storage nodes will be used. Returns a proposed new nodeset that meets the + /// requirements of the supplied selection strategy and replication, or an explicit error. + pub fn select( + &self, + strategy: NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + rng: &mut R, + preferred_nodes: &NodeSet, + ) -> Result { + if replication_property.at_greatest_scope().0 != &LocationScope::Node { + // todo: add support for other location scopes + unimplemented!("only node-scoped replication is currently supported"); + } + + let writable_nodeset = WritableNodeSet::from(self.nodes_config); + // Only consider alive, writable storage nodes. + let alive_nodeset = writable_nodeset.alive(self.cluster_state); + + let nodeset_size = + nodeset_size_range(&strategy, replication_property, writable_nodeset.len()); + + if writable_nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + nodes_count = %writable_nodeset.len(), + ?nodeset_size.minimum_size, + ?nodeset_size.fault_tolerant_size, + cluster_state = ?self.cluster_state, + nodes_config = ?self.nodes_config, + "Not enough nodes to meet the fault tolerant replication requirements" + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + let nodeset = match strategy { + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => { + let mut nodes = preferred_nodes + .iter() + .copied() + .filter(|node_id| alive_nodeset.contains(node_id)) + .choose_multiple(rng, nodeset_size.target_size); + + if nodes.len() < nodeset_size.target_size { + let remaining = nodeset_size.target_size - nodes.len(); + nodes.extend( + alive_nodeset + .iter() + .filter(|node_id| !preferred_nodes.contains(node_id)) + .choose_multiple(rng, remaining), + ); + } + + if nodes.len() < nodeset_size.minimum_size { + trace!( + "Failed to place replicated loglet: insufficient alive nodes to meet minimum size requirement {} < {}", + nodes.len(), + nodeset_size.minimum_size, + ); + + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + // last possibility is if the selected nodeset is still + // smaller than fault tolerant size we try to extend from the full nodeset + // which includes possibly dead nodes + if nodes.len() < nodeset_size.fault_tolerant_size { + // greedy approach: Every other node that is not + // already in the set. + let remaining = nodeset_size.fault_tolerant_size - nodes.len(); + + let extension = writable_nodeset + .iter() + .filter(|node_id| !alive_nodeset.contains(node_id)) + .cloned() + .sorted_by(|l, r| { + // sorting nodes by "preferred" nodes. Preferred nodes comes first. + match (preferred_nodes.contains(l), preferred_nodes.contains(r)) { + (true, true) | (false, false) => Ordering::Equal, + (true, false) => Ordering::Less, + (false, true) => Ordering::Greater, + } + }) + .take(remaining); + + nodes.extend(extension); + } + + let nodes_len = nodes.len(); + let nodeset = NodeSet::from_iter(nodes); + assert_eq!( + nodeset.len(), + nodes_len, + "We have accidentally chosen duplicate candidates during nodeset selection" + ); + nodeset + } + }; + + // even with all possible dead node we still can't reach the fault tolerant + // nodeset size. This means there are not enough nodes in the cluster + // so we still return an error. + + // todo: implement location scope-aware selection + if nodeset.len() < nodeset_size.fault_tolerant_size { + trace!( + "Failed to place replicated loglet: insufficient writeable nodes to meet fault tolerant size requirement {} < {}", + nodeset.len(), + nodeset_size.fault_tolerant_size, + ); + return Err(NodeSelectionError::InsufficientWriteableNodes); + } + + Ok(nodeset) + } +} + +#[derive(Debug)] +struct NodeSetSizeRange { + /// Minimum number of nodes required to maintain write availability; + /// dropping below this threshold will result in loss of write availability. + minimum_size: usize, + /// The minimum number of nodes to satisfy replication + /// property with fault tolerance + /// + /// calculated as (minimum_size - 1) * 2 + 1 + fault_tolerant_size: usize, + /// The proposed number of nodes to use if possible + target_size: usize, +} + +fn nodeset_size_range( + strategy: &NodeSetSelectionStrategy, + replication_property: &ReplicationProperty, + writable_nodes_size: usize, +) -> NodeSetSizeRange { + let min_copies = replication_property.num_copies(); + + // ReplicationFactor(f+1) implies a minimum of 2f+1 nodes. At this point we are only + // calculating the nodeset floor size, the actual size will be determined by the specific + // strategy in use. + assert!( + min_copies < u8::MAX >> 1, + "The replication factor implies a cluster size that exceeds the maximum supported size" + ); + + let fault_tolerant_size = (usize::from(min_copies) - 1) * 2 + 1; + assert!( + fault_tolerant_size >= usize::from(min_copies), + "The calculated minimum nodeset size can not be less than the replication factor" + ); + + let (fault_tolerant_size, nodeset_target_size) = match strategy { + // writable_nodes_size includes any writable node (log-server) dead or alive. + // in the greedy strategy we take the max(fault_tolerant, writable_nodes_size) as + // our target size + NodeSetSelectionStrategy::StrictFaultTolerantGreedy => ( + fault_tolerant_size, + max(fault_tolerant_size, writable_nodes_size), + ), + }; + + NodeSetSizeRange { + minimum_size: min_copies.into(), + fault_tolerant_size, + target_size: nodeset_target_size, + } +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum NodeSelectionError { + #[error("Insufficient writeable nodes in the nodeset")] + InsufficientWriteableNodes, +} + +/// Set of all log-server nodeset, regardless of the state +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref)] +struct WritableNodeSet(NodeSet); + +impl WritableNodeSet { + fn alive(&self, state: &ObservedClusterState) -> AliveNodeSet { + self.iter() + .cloned() + .filter(|id| state.is_node_alive(*id)) + .collect::() + .into() + } +} + +impl From<&NodesConfiguration> for WritableNodeSet { + fn from(value: &NodesConfiguration) -> Self { + Self( + value + .iter() + .filter_map(|(node_id, config)| { + if config.log_server_config.storage_state.can_write_to() { + Some(node_id) + } else { + None + } + }) + .collect(), + ) + } +} + +/// A subset of WritableNodeset that is known to be alive at the time of creation. +#[derive(Debug, Clone, Eq, PartialEq, derive_more::Into, derive_more::Deref, derive_more::From)] +struct AliveNodeSet(NodeSet); + +#[cfg(test)] +pub mod tests { + // ** NOTE ** + // THESE TESTS ARE TEMPORARY DISABLED AND WILL ENABLED AFTER CLUSTER STATE IS IMPLEMENTED + // THIS IS A TRANSITIONAL STATE + + // use std::collections::HashSet; + // + // use enumset::enum_set; + // use rand::thread_rng; + // + // use restate_types::nodes_config::{NodesConfiguration, Role, StorageState}; + // use restate_types::replicated_loglet::{LocationScope, NodeSet, ReplicationProperty}; + // use restate_types::PlainNodeId; + // + // use super::*; + // use crate::cluster_controller::logs_controller::tests::{node, MockNodes}; + // use crate::cluster_controller::observed_cluster_state::ObservedClusterState; + // + // #[test] + // #[should_panic( + // expected = "not implemented: only node-scoped replication is currently supported" + // )] + // fn test_select_log_servers_rejects_unsupported_replication_scope() { + // let replication = + // ReplicationProperty::with_scope(LocationScope::Zone, 1.try_into().unwrap()); + // + // let nodes_config = NodesConfiguration::default(); + // let observed_state = ObservedClusterState::default(); + // + // let preferred_nodes = NodeSet::empty(); + // NodeSetSelector::new(&nodes_config, &observed_state) + // .select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ) + // .unwrap(); // panics + // } + // + // #[test] + // fn test_select_log_servers_insufficient_capacity() { + // let nodes: Vec = vec![1.into(), 2.into(), 3.into()]; + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let mut nodes_config = NodesConfiguration::default(); + // nodes_config.upsert_node(node(0, enum_set!(Role::Admin), StorageState::Disabled)); + // nodes_config.upsert_node(node( + // 1, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::Provisioning, + // )); + // nodes_config.upsert_node(node( + // 2, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // )); + // nodes_config.upsert_node(node( + // 3, + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadOnly, + // )); + // nodes_config.upsert_node(node(4, enum_set!(Role::Worker), StorageState::Disabled)); + // + // let observed_state = ObservedClusterState { + // alive_nodes: nodes + // .iter() + // .copied() + // .map(|id| (id, id.with_generation(1))) + // .collect(), + // dead_nodes: HashSet::default(), + // ..Default::default() + // }; + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes_config, &observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes) + // ); + // } + // + // /// Replicated loglets should work just fine in single-node clusters, with the FT strategy inferring that f=0, + // /// as long as the replication factor is set to 1. + // #[test] + // fn test_select_log_servers_single_node_cluster() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 1.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert_eq!( + // selection.unwrap(), + // NodeSet::from([1]), + // "A single-node cluster is possible with replication factor of 0" + // ); + // } + // + // /// In this test we have a cluster with 3 nodes and replication factor is 2. The strict FT + // /// strategy can bootstrap a loglet using all 3 nodes but won't choose a new nodeset when only 2 + // /// are alive, as that puts the loglet at risk. The assumption is that the previous nodeset will + // /// carry on in its original configuration - it is the data plane's problem to work around + // /// partial node availability. When an additional log server becomes available, the selector can + // /// reconfigure the loglet to use it. + // #[test] + // fn test_select_log_servers_respects_replication_factor() { + // let mut nodes = MockNodes::builder() + // .with_mixed_server_nodes([1, 2, 3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.kill_node(1); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // + // // while one node is dead, the selector can still satisfy a write quorum + // // based on supplied replication property. The dead node will be included + // // in the nodeset. + // assert!(selection.is_ok()); + // let initial_nodeset = selection.unwrap(); + // assert_eq!(initial_nodeset, NodeSet::from([1, 2, 3])); + // + // nodes.add_dedicated_log_server_node(4); + // + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &initial_nodeset, // preferred nodes + // ); + // assert_eq!(selection.unwrap(), NodeSet::from([2, 3, 4])); + // } + // + // #[test] + // fn test_select_log_servers_respects_replication_factor_not_enough_nodes() { + // let nodes = MockNodes::builder().with_mixed_server_nodes([1, 2]).build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // // initial selection - no prior preferences + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // NodeSetSelectionStrategy::StrictFaultTolerantGreedy, + // &replication, + // &mut thread_rng(), + // &NodeSet::empty(), + // ); + // + // // in this case, the entire cluster does not have enough nodes for an optimal + // // nodeset. + // assert_eq!( + // selection, + // Err(NodeSelectionError::InsufficientWriteableNodes), + // "The strict FT strategy does not compromise on the minimum 2f+1 nodeset size" + // ); + // } + // + // #[test] + // fn test_select_log_servers_insufficient_fault_tolerant_capacity() { + // // while we only have 2 alive node, the algorithm will still + // // prefer to use a dead node instead of failing as long as + // // we have write availability + // + // let nodes = MockNodes::builder() + // .with_nodes( + // [1, 2, 3], + // enum_set!(Role::LogServer | Role::Worker), + // StorageState::ReadWrite, + // ) + // .dead_nodes([3]) + // .build(); + // + // let replication = + // ReplicationProperty::with_scope(LocationScope::Node, 2.try_into().unwrap()); + // + // let strategy = NodeSetSelectionStrategy::StrictFaultTolerantGreedy; + // let preferred_nodes = NodeSet::empty(); + // let selection = NodeSetSelector::new(&nodes.nodes_config, &nodes.observed_state).select( + // strategy, + // &replication, + // &mut thread_rng(), + // &preferred_nodes, + // ); + // + // assert!(selection.is_ok()); + // let selection = selection.unwrap(); + // assert!(selection.contains(&PlainNodeId::from(3))); + // } +} diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index d164ccbfa..7ccb48e48 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -16,16 +16,19 @@ use dashmap::DashMap; use tracing::debug; use restate_core::network::{MessageRouterBuilder, Networking, TransportConnect}; -use restate_core::{TaskCenter, TaskKind}; +use restate_core::{my_node_id, Metadata, TaskCenter, TaskKind}; use restate_metadata_store::MetadataStoreClient; use restate_types::config::Configuration; -use restate_types::logs::metadata::{LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::{LogId, RecordCache}; +use restate_types::logs::metadata::{ + Chain, LogletParams, ProviderConfiguration, ProviderKind, SegmentIndex, +}; +use restate_types::logs::{LogId, LogletId, RecordCache}; use restate_types::replicated_loglet::ReplicatedLogletParams; use super::loglet::ReplicatedLoglet; use super::metric_definitions; use super::network::RequestPump; +use super::nodeset_selector::{NodeSelectionError, NodeSetSelector, ObservedClusterState}; use super::rpc_routers::{LogServersRpc, SequencersRpc}; use crate::loglet::{Loglet, LogletProvider, LogletProviderFactory, OperationError}; use crate::providers::replicated_loglet::error::ReplicatedLogletError; @@ -201,6 +204,76 @@ impl LogletProvider for ReplicatedLogletProvider { Ok(loglet as Arc) } + fn propose_new_loglet_params( + &self, + log_id: LogId, + chain: Option<&Chain>, + defaults: &ProviderConfiguration, + ) -> Result { + let ProviderConfiguration::Replicated(defaults) = defaults else { + panic!("ProviderConfiguration::Replicated is expected"); + }; + + let new_segment_index = chain + .map(|c| c.tail_index().next()) + .unwrap_or(SegmentIndex::OLDEST); + + let loglet_id = LogletId::new(log_id, new_segment_index); + + let mut rng = rand::thread_rng(); + + let replication = defaults.replication_property.clone(); + let strategy = defaults.nodeset_selection_strategy; + + // if the last loglet in the chain is of the same provider kind, we can use this to + // influence the nodeset selector. + let previous_params = chain.and_then(|chain| { + let tail_config = chain.tail().config; + match tail_config.kind { + ProviderKind::Replicated => Some( + ReplicatedLogletParams::deserialize_from(tail_config.params.as_bytes()) + .expect("params serde must be infallible"), + ), + // Another kind, we don't care about its config + _ => None, + } + }); + + let preferred_nodes = previous_params + .map(|p| p.nodeset.clone()) + .unwrap_or_default(); + let nodes_config = Metadata::with_current(|m| m.nodes_config_ref()); + + let selection = NodeSetSelector::new(&nodes_config, &ObservedClusterState).select( + strategy, + &replication, + &mut rng, + &preferred_nodes, + ); + + match selection { + Ok(nodeset) => Ok(LogletParams::from( + ReplicatedLogletParams { + loglet_id, + // We choose ourselves to be the sequencer for this loglet + sequencer: my_node_id(), + replication, + nodeset, + } + .serialize() + .expect("params serde must be infallible"), + )), + Err(e @ NodeSelectionError::InsufficientWriteableNodes) => { + debug!( + ?loglet_id, + "Insufficient writeable nodes to select new nodeset for replicated loglet" + ); + + Err(OperationError::terminal(e)) + } + } + } + async fn shutdown(&self) -> Result<(), OperationError> { Ok(()) } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs index 068ee3b6f..49398117c 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/check_seal.rs @@ -94,6 +94,7 @@ impl CheckSealTask { loglet_id = %my_params.loglet_id, status = %nodeset_checker, effective_nodeset = %effective_nodeset, + replication = %my_params.replication, "Insufficient nodes responded to GetLogletInfo requests, we cannot determine seal status, we'll assume it's unsealed for now", ); return Ok(CheckSealOutcome::ProbablyOpen); diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 5607da03b..3c9773f99 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::any::type_name; use std::sync::Arc; use arc_swap::ArcSwap; @@ -428,11 +429,18 @@ impl MetadataManager { let mut maybe_new_version = new_value.version(); if new_value.version() > current_value.version() { + trace!( + "Updating {} from {} to {}", + type_name::(), + current_value.version(), + new_value.version(), + ); container.store(new_value); } else { /* Do nothing, current is already newer */ trace!( - "Ignoring update {} because we are at {}", + "Ignoring update of {} to {} because we are already at {}", + type_name::(), new_value.version(), current_value.version(), ); diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..412353f1f 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -233,7 +233,7 @@ pub struct CommonOptions { /// # Network error retry policy /// - /// The retry policy for node network error + /// The retry policy for network related errors pub network_error_retry_policy: RetryPolicy, } diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index 1668910b5..7eaecb526 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -194,7 +194,7 @@ impl TryFrom for NodeSetSelectionStrategy { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "kebab-case")] -pub enum DefaultProvider { +pub enum ProviderConfiguration { #[cfg(any(test, feature = "memory-loglet"))] InMemory, #[default] @@ -202,8 +202,8 @@ pub enum DefaultProvider { Replicated(ReplicatedLogletConfig), } -impl DefaultProvider { - pub fn as_provider_kind(&self) -> ProviderKind { +impl ProviderConfiguration { + pub fn kind(&self) -> ProviderKind { match self { #[cfg(any(test, feature = "memory-loglet"))] Self::InMemory => ProviderKind::InMemory, @@ -213,17 +213,17 @@ impl DefaultProvider { } } -impl From for crate::protobuf::cluster::DefaultProvider { - fn from(value: DefaultProvider) -> Self { +impl From for crate::protobuf::cluster::DefaultProvider { + fn from(value: ProviderConfiguration) -> Self { use crate::protobuf::cluster; let mut result = crate::protobuf::cluster::DefaultProvider::default(); match value { - DefaultProvider::Local => result.provider = ProviderKind::Local.to_string(), + ProviderConfiguration::Local => result.provider = ProviderKind::Local.to_string(), #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => result.provider = ProviderKind::InMemory.to_string(), - DefaultProvider::Replicated(config) => { + ProviderConfiguration::InMemory => result.provider = ProviderKind::InMemory.to_string(), + ProviderConfiguration::Replicated(config) => { result.provider = ProviderKind::Replicated.to_string(); result.replicated_config = Some(cluster::ReplicatedProviderConfig { replication_property: config.replication_property.to_string(), @@ -236,7 +236,7 @@ impl From for crate::protobuf::cluster::DefaultProvider { } } -impl TryFrom for DefaultProvider { +impl TryFrom for ProviderConfiguration { type Error = anyhow::Error; fn try_from(value: crate::protobuf::cluster::DefaultProvider) -> Result { let provider_kind: ProviderKind = value.provider.parse()?; @@ -272,7 +272,7 @@ pub struct ReplicatedLogletConfig { #[derive(Debug, Clone, Eq, PartialEq, Default, serde::Serialize, serde::Deserialize)] pub struct LogsConfiguration { - pub default_provider: DefaultProvider, + pub default_provider: ProviderConfiguration, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -325,12 +325,14 @@ impl TryFrom for Logs { // this means we are migrating from an older setup that had replication-property // hardcoded to {node:2} config = Some(LogsConfiguration { - default_provider: DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(2).expect("2 is not 0"), - ), - }), + default_provider: ProviderConfiguration::Replicated( + ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(2).expect("2 is not 0"), + ), + }, + ), }) } } @@ -495,14 +497,16 @@ impl Logs { Self::with_logs_configuration(LogsConfiguration { default_provider: match config.bifrost.default_provider { #[cfg(any(test, feature = "memory-loglet"))] - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => DefaultProvider::Replicated(ReplicatedLogletConfig { - nodeset_selection_strategy: NodeSetSelectionStrategy::default(), - replication_property: ReplicationProperty::new( - NonZeroU8::new(1).expect("1 is not zero"), - ), - }), + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, + ProviderKind::Replicated => { + ProviderConfiguration::Replicated(ReplicatedLogletConfig { + nodeset_selection_strategy: NodeSetSelectionStrategy::default(), + replication_property: ReplicationProperty::new( + NonZeroU8::new(1).expect("1 is not zero"), + ), + }) + } }, }) } @@ -510,6 +514,7 @@ impl Logs { pub fn with_logs_configuration(logs_configuration: LogsConfiguration) -> Self { Logs { config: logs_configuration, + version: Version::MIN, ..Default::default() } } diff --git a/tools/restatectl/src/commands/cluster/config.rs b/tools/restatectl/src/commands/cluster/config.rs index 620a2f488..b985b1278 100644 --- a/tools/restatectl/src/commands/cluster/config.rs +++ b/tools/restatectl/src/commands/cluster/config.rs @@ -16,7 +16,7 @@ use std::fmt::{self, Display, Write}; use cling::prelude::*; use restate_types::{ - logs::metadata::DefaultProvider, partition_table::ReplicationStrategy, + logs::metadata::ProviderConfiguration, partition_table::ReplicationStrategy, protobuf::cluster::ClusterConfiguration, }; @@ -43,7 +43,7 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result write_leaf(&mut w, 1, false, "Bifrost replication strategy", strategy)?; - let provider: DefaultProvider = config.default_provider.unwrap_or_default().try_into()?; + let provider: ProviderConfiguration = config.default_provider.unwrap_or_default().try_into()?; write_default_provider(&mut w, 1, provider)?; Ok(w) @@ -52,19 +52,19 @@ fn cluster_config_string(config: ClusterConfiguration) -> anyhow::Result fn write_default_provider( w: &mut W, depth: usize, - provider: DefaultProvider, + provider: ProviderConfiguration, ) -> Result<(), fmt::Error> { let title = "Bifrost Provider"; match provider { #[cfg(any(test, feature = "memory-loglet"))] - DefaultProvider::InMemory => { + ProviderConfiguration::InMemory => { write_leaf(w, depth, true, title, "in-memory")?; } - DefaultProvider::Local => { + ProviderConfiguration::Local => { write_leaf(w, depth, true, title, "local")?; } #[cfg(feature = "replicated-loglet")] - DefaultProvider::Replicated(config) => { + ProviderConfiguration::Replicated(config) => { write_leaf(w, depth, true, title, "replicated")?; let depth = depth + 1; write_leaf( diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f2..abdece88c 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -23,7 +23,7 @@ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; use restate_types::logs::metadata::{ - DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, + NodeSetSelectionStrategy, ProviderConfiguration, ProviderKind, ReplicatedLogletConfig, }; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; @@ -89,8 +89,8 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an if let Some(provider) = set_opts.bifrost_provider { let default_provider = match provider { - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, + ProviderKind::InMemory => ProviderConfiguration::InMemory, + ProviderKind::Local => ProviderConfiguration::Local, ProviderKind::Replicated => { let config = ReplicatedLogletConfig { replication_property: set_opts @@ -101,7 +101,7 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an .nodeset_selection_strategy .unwrap_or_default(), }; - DefaultProvider::Replicated(config) + ProviderConfiguration::Replicated(config) } }; diff --git a/tools/restatectl/src/commands/log/list_logs.rs b/tools/restatectl/src/commands/log/list_logs.rs index a92668959..1b48e7b44 100644 --- a/tools/restatectl/src/commands/log/list_logs.rs +++ b/tools/restatectl/src/commands/log/list_logs.rs @@ -55,6 +55,11 @@ pub async fn list_logs(connection: &ConnectionInfo, _opts: &ListLogsOpts) -> any c_println!("Log Configuration ({})", logs.version()); + c_println!( + "Default Provider Config: {:?}", + logs.configuration().default_provider + ); + // sort by log-id for display let logs: BTreeMap = logs.iter().map(|(id, chain)| (*id, chain)).collect(); From 52a62f105f9bb52f4fa2f4d4cff21b27550b13ed Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 7 Jan 2025 13:50:07 +0000 Subject: [PATCH 2/2] [Core] MetadataWriter access to MetadataStore and Bifrost access to MetadataWriter This PR introduces two key changes: 1. Bifrost now can provide `bifrost.admin()` to get easy access to bifrost's admin interface withouto passing metadata writer and metadata store client explicitly. 2. MetadataWriter now owns MetadataStoreClient. This means that you can always access metadata store client directly if you have metadata_writer which used to be a pair of types we _always_ pass together. This also opens a path for a future where MetadataWriter wraps the metadata store client and automatically update metadata manager on successful writes but that's beyond the scope of this PR. What this PR provides is a direct access to the underlying metadata_store_client via an accessor function. --- .../cluster_controller/grpc_svc_handler.rs | 25 ++--- .../src/cluster_controller/logs_controller.rs | 34 ++---- .../admin/src/cluster_controller/service.rs | 61 ++++------- .../src/cluster_controller/service/state.rs | 22 +--- crates/admin/src/schema_registry/mod.rs | 23 ++-- crates/admin/src/service.rs | 3 - crates/bifrost/src/bifrost.rs | 70 ++++++------ crates/bifrost/src/bifrost_admin.rs | 101 +++++++++--------- crates/bifrost/src/loglet/provider.rs | 3 +- crates/bifrost/src/read_stream.rs | 32 +++--- crates/bifrost/src/service.rs | 14 +-- crates/core/src/metadata.rs | 19 +++- crates/core/src/metadata/manager.rs | 6 +- crates/core/src/metadata_store.rs | 1 - crates/node/src/lib.rs | 4 +- crates/node/src/roles/admin.rs | 4 - crates/worker/src/partition/cleaner.rs | 4 +- crates/worker/src/partition/leadership/mod.rs | 4 +- crates/worker/src/partition/shuffle.rs | 2 +- .../worker/src/partition_processor_manager.rs | 3 +- server/tests/common/replicated_loglet.rs | 12 +-- server/tests/replicated_loglet.rs | 15 +-- tools/bifrost-benchpress/src/main.rs | 2 +- tools/restatectl/src/commands/log/dump_log.rs | 4 +- tools/xtask/src/main.rs | 3 +- 25 files changed, 200 insertions(+), 271 deletions(-) diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index f9c820239..c685a102b 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -16,9 +16,8 @@ use restate_types::protobuf::cluster::ClusterConfiguration; use tonic::{async_trait, Request, Response, Status}; use tracing::info; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BiforstError}; +use restate_bifrost::{Bifrost, Error as BiforstError}; use restate_core::{Metadata, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; use restate_types::identifiers::PartitionId; use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; @@ -44,7 +43,6 @@ use super::service::ChainExtension; use super::ClusterControllerHandle; pub(crate) struct ClusterCtrlSvcHandler { - metadata_store_client: MetadataStoreClient, controller_handle: ClusterControllerHandle, bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -53,20 +51,19 @@ pub(crate) struct ClusterCtrlSvcHandler { impl ClusterCtrlSvcHandler { pub fn new( controller_handle: ClusterControllerHandle, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, metadata_writer: MetadataWriter, ) -> Self { Self { controller_handle, - metadata_store_client, bifrost, metadata_writer, } } async fn get_logs(&self) -> Result { - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await .map_err(|error| Status::unknown(format!("Failed to get log metadata: {error:?}")))? @@ -120,7 +117,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let (trim_point, nodes_config) = tokio::join!( self.bifrost.get_trim_point(log_id), - self.metadata_store_client + self.metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()), ); @@ -151,7 +149,8 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { _request: Request, ) -> Result, Status> { let nodes_config = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(NODES_CONFIG_KEY.clone()) .await .map_err(|error| { @@ -261,13 +260,9 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler { let request = request.into_inner(); let log_id: LogId = request.log_id.into(); - let admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - - let writable_loglet = admin + let writable_loglet = self + .bifrost + .admin() .writeable_loglet(log_id) .await .map_err(|err| match err { diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 4f1f7db6e..5935181af 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -25,9 +25,9 @@ use tokio::task::JoinSet; use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; -use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; +use restate_bifrost::{Bifrost, Error as BifrostError}; use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, ReadWriteError, WriteError, + retry_on_network_error, Precondition, ReadWriteError, WriteError, }; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; @@ -924,7 +924,6 @@ pub struct LogsController { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, async_operations: JoinSet, find_logs_tail_semaphore: Arc, @@ -934,16 +933,17 @@ impl LogsController { pub async fn init( configuration: &Configuration, bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, ) -> Result { // obtain the latest logs or init it with an empty logs variant let logs = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - metadata_store_client.get_or_insert(BIFROST_CONFIG_KEY.clone(), || { - Logs::from_configuration(configuration) - }) + metadata_writer + .metadata_store_client() + .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { + Logs::from_configuration(configuration) + }) }, ) .await?; @@ -965,7 +965,6 @@ impl LogsController { retry_policy, ), bifrost, - metadata_store_client, metadata_writer, async_operations: JoinSet::default(), find_logs_tail_semaphore: Arc::new(Semaphore::new(1)), @@ -984,17 +983,12 @@ impl LogsController { let logs = Arc::clone(&self.inner.current_logs); let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); let find_tail = async move { - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - let mut updates = LogsTailUpdates::default(); for (log_id, chain) in logs.iter() { let tail_segment = chain.tail(); - let writable_loglet = match bifrost_admin.writeable_loglet(*log_id).await { + let writable_loglet = match bifrost.admin().writeable_loglet(*log_id).await { Ok(loglet) => loglet, Err(BifrostError::Shutdown(_)) => break, Err(err) => { @@ -1098,7 +1092,6 @@ impl LogsController { logs: Arc, mut debounce: Option>, ) { - let metadata_store_client = self.metadata_store_client.clone(); let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn(async move { @@ -1108,7 +1101,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - if let Err(err) = metadata_store_client + if let Err(err) = metadata_writer.metadata_store_client() .put( BIFROST_CONFIG_KEY.clone(), logs.deref(), @@ -1120,7 +1113,7 @@ impl LogsController { WriteError::FailedPrecondition(_) => { debug!("Detected a concurrent modification of logs. Fetching the latest logs now."); // There was a concurrent modification of the logs. Fetch the latest version. - match metadata_store_client + match metadata_writer.metadata_store_client() .get::(BIFROST_CONFIG_KEY.clone()) .await { @@ -1166,8 +1159,6 @@ impl LogsController { mut debounce: Option>, ) { let bifrost = self.bifrost.clone(); - let metadata_store_client = self.metadata_store_client.clone(); - let metadata_writer = self.metadata_writer.clone(); self.async_operations.spawn( async move { @@ -1177,10 +1168,7 @@ impl LogsController { tokio::time::sleep(delay).await; } - let bifrost_admin = - BifrostAdmin::new(&bifrost, &metadata_writer, &metadata_store_client); - - match bifrost_admin.seal(log_id, segment_index).await { + match bifrost.admin().seal(log_id, segment_index).await { Ok(sealed_segment) => { if sealed_segment.tail.is_sealed() { Event::SealSucceeded { diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 3acbc22f5..9c68aab81 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -36,8 +36,8 @@ use restate_types::partition_table::{ }; use restate_types::replicated_loglet::ReplicatedLogletParams; -use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; -use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; +use restate_bifrost::{Bifrost, SealedSegment}; +use restate_core::metadata_store::retry_on_network_error; use restate_core::network::rpc_router::RpcRouter; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ @@ -79,7 +79,6 @@ pub struct Service { cluster_state_refresher: ClusterStateRefresher, configuration: Live, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, processor_manager_client: PartitionProcessorManagerClient>, command_tx: mpsc::Sender, @@ -102,7 +101,6 @@ where router_builder: &mut MessageRouterBuilder, server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, ) -> Self { let (command_tx, command_rx) = mpsc::channel(2); @@ -122,7 +120,6 @@ where ClusterControllerHandle { tx: command_tx.clone(), }, - metadata_store_client.clone(), bifrost.clone(), metadata_writer.clone(), )) @@ -140,7 +137,6 @@ where bifrost, cluster_state_refresher, metadata_writer, - metadata_store_client, processor_manager_client, command_tx, command_rx, @@ -309,12 +305,6 @@ impl Service { let mut shutdown = std::pin::pin!(cancellation_watcher()); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let mut state: ClusterControllerState = ClusterControllerState::Follower; self.health_status.update(AdminStatus::Ready); @@ -333,7 +323,7 @@ impl Service { } Some(cmd) = self.command_rx.recv() => { // it is still safe to handle cluster commands as a follower - self.on_cluster_cmd(cmd, bifrost_admin).await; + self.on_cluster_cmd(cmd).await; } _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); @@ -360,8 +350,9 @@ impl Service { let partition_table = retry_on_network_error( configuration.common.network_error_retry_policy.clone(), || { - self.metadata_store_client - .get_or_insert(PARTITION_TABLE_KEY.clone(), || { + self.metadata_writer.metadata_store_client().get_or_insert( + PARTITION_TABLE_KEY.clone(), + || { let partition_table = PartitionTable::with_equally_sized_partitions( Version::MIN, configuration.common.bootstrap_num_partitions.get(), @@ -370,7 +361,8 @@ impl Service { debug!("Initializing the partition table with '{partition_table:?}'"); partition_table - }) + }, + ) }, ) .await?; @@ -442,7 +434,8 @@ impl Service { default_provider: ProviderConfiguration, ) -> anyhow::Result<()> { let logs = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write(BIFROST_CONFIG_KEY.clone(), |current: Option| { let logs = match current { Some(logs) => logs, @@ -493,7 +486,8 @@ impl Service { }; let partition_table = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( PARTITION_TABLE_KEY.clone(), |current: Option| { @@ -557,7 +551,6 @@ impl Service { extension, min_version, bifrost: self.bifrost.clone(), - metadata_store_client: self.metadata_store_client.clone(), metadata_writer: self.metadata_writer.clone(), observed_cluster_state: self.observed_cluster_state.clone(), }; @@ -569,11 +562,7 @@ impl Service { }); } - async fn on_cluster_cmd( - &self, - command: ClusterControllerCommand, - bifrost_admin: BifrostAdmin<'_>, - ) { + async fn on_cluster_cmd(&self, command: ClusterControllerCommand) { match command { ClusterControllerCommand::GetClusterState(tx) => { let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); @@ -587,7 +576,7 @@ impl Service { ?log_id, trim_point_inclusive = ?trim_point, "Manual trim log command received"); - let result = bifrost_admin.trim(log_id, trim_point).await; + let result = self.bifrost.admin().trim(log_id, trim_point).await; let _ = response_tx.send(result.map_err(Into::into)); } ClusterControllerCommand::CreateSnapshot { @@ -715,7 +704,6 @@ struct SealAndExtendTask { extension: Option, bifrost: Bifrost, metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, observed_cluster_state: ObservedClusterState, } @@ -726,18 +714,14 @@ impl SealAndExtendTask { .as_ref() .and_then(|ext| ext.segment_index_to_seal); - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let (provider, params) = match self.extension.take() { Some(extension) => (extension.provider_kind, extension.params), None => self.next_segment().await?, }; - let sealed_segment = bifrost_admin + let sealed_segment = self + .bifrost + .admin() .seal_and_extend_chain( self.log_id, last_segment_index, @@ -796,7 +780,8 @@ impl SealAndExtendTask { #[cfg(feature = "replicated-loglet")] ProviderConfiguration::Replicated(config) => { let schedule_plan = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .get::(SCHEDULING_PLAN_KEY.clone()) .await?; @@ -858,7 +843,8 @@ mod tests { async fn manual_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let svc = Service::new( @@ -869,7 +855,6 @@ mod tests { &mut builder.router_builder, &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let svc_handle = svc.handle(); @@ -1141,7 +1126,8 @@ mod tests { { restate_types::config::set_current_config(config); let mut builder = TestCoreEnvBuilder::with_incoming_only_connector(); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let mut server_builder = NetworkServerBuilder::default(); @@ -1154,7 +1140,6 @@ mod tests { &mut builder.router_builder, &mut server_builder, builder.metadata_writer.clone(), - builder.metadata_store_client.clone(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..430ff8124 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -17,10 +17,9 @@ use tokio::time; use tokio::time::{Interval, MissedTickBehavior}; use tracing::{debug, info, warn}; -use restate_bifrost::{Bifrost, BifrostAdmin}; -use restate_core::metadata_store::MetadataStoreClient; +use restate_bifrost::Bifrost; use restate_core::network::TransportConnect; -use restate_core::{my_node_id, Metadata, MetadataWriter}; +use restate_core::{my_node_id, Metadata}; use restate_types::cluster::cluster_state::{AliveNode, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; @@ -135,8 +134,6 @@ pub enum LeaderEvent { pub struct Leader { bifrost: Bifrost, - metadata_store_client: MetadataStoreClient, - metadata_writer: MetadataWriter, logs_watcher: watch::Receiver, partition_table_watcher: watch::Receiver, find_logs_tail_interval: Interval, @@ -156,7 +153,7 @@ where let scheduler = Scheduler::init( &configuration, - service.metadata_store_client.clone(), + service.metadata_writer.metadata_store_client().clone(), service.networking.clone(), ) .await?; @@ -164,7 +161,6 @@ where let logs_controller = LogsController::init( &configuration, service.bifrost.clone(), - service.metadata_store_client.clone(), service.metadata_writer.clone(), ) .await?; @@ -179,8 +175,6 @@ where let metadata = Metadata::current(); let mut leader = Self { bifrost: service.bifrost.clone(), - metadata_store_client: service.metadata_store_client.clone(), - metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), @@ -296,12 +290,6 @@ where } async fn trim_logs_inner(&self) -> Result<(), restate_bifrost::Error> { - let bifrost_admin = BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ); - let cluster_state = self.cluster_state_watcher.current(); let mut persisted_lsns_per_partition: BTreeMap< @@ -342,13 +330,13 @@ where if persisted_lsns.len() >= cluster_state.nodes.len() { let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID); // trim point is before the oldest record - let current_trim_point = bifrost_admin.get_trim_point(log_id).await?; + let current_trim_point = self.bifrost.get_trim_point(log_id).await?; if min_persisted_lsn >= current_trim_point + self.log_trim_threshold { debug!( "Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'" ); - bifrost_admin.trim(log_id, min_persisted_lsn).await? + self.bifrost.admin().trim(log_id, min_persisted_lsn).await? } } else { warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log."); diff --git a/crates/admin/src/schema_registry/mod.rs b/crates/admin/src/schema_registry/mod.rs index cd043be8d..39481e6d2 100644 --- a/crates/admin/src/schema_registry/mod.rs +++ b/crates/admin/src/schema_registry/mod.rs @@ -11,16 +11,15 @@ pub mod error; mod updater; -use http::Uri; - use std::borrow::Borrow; use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use std::time::Duration; + +use http::Uri; use tracing::subscriber::NoSubscriber; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::{Metadata, MetadataWriter}; use restate_service_protocol::discovery::{DiscoverEndpoint, DiscoveredEndpoint, ServiceDiscovery}; use restate_types::identifiers::{DeploymentId, ServiceRevision, SubscriptionId}; @@ -77,7 +76,6 @@ pub enum ModifyServiceChange { /// new deployments. #[derive(Clone)] pub struct SchemaRegistry { - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -87,7 +85,6 @@ pub struct SchemaRegistry { impl SchemaRegistry { pub fn new( - metadata_store_client: MetadataStoreClient, metadata_writer: MetadataWriter, service_discovery: ServiceDiscovery, subscription_validator: V, @@ -95,7 +92,6 @@ impl SchemaRegistry { ) -> Self { Self { metadata_writer, - metadata_store_client, service_discovery, subscription_validator, experimental_feature_kafka_ingress_next, @@ -155,7 +151,8 @@ impl SchemaRegistry { } else { let mut new_deployment_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -195,7 +192,8 @@ impl SchemaRegistry { deployment_id: DeploymentId, ) -> Result<(), SchemaRegistryError> { let schema_registry = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_registry: Option| { @@ -229,7 +227,8 @@ impl SchemaRegistry { changes: Vec, ) -> Result { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -270,7 +269,8 @@ impl SchemaRegistry { subscription_id: SubscriptionId, ) -> Result<(), SchemaRegistryError> { let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { @@ -370,7 +370,8 @@ where let mut subscription_id = None; let schema_information = self - .metadata_store_client + .metadata_writer + .metadata_store_client() .read_modify_write( SCHEMA_INFORMATION_KEY.clone(), |schema_information: Option| { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index b3c5ecef8..3f3020a62 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -17,7 +17,6 @@ use restate_types::config::AdminOptions; use restate_types::live::LiveLoad; use tower::ServiceBuilder; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; @@ -44,7 +43,6 @@ where { pub fn new( metadata_writer: MetadataWriter, - metadata_store_client: MetadataStoreClient, bifrost: Bifrost, subscription_validator: V, service_discovery: ServiceDiscovery, @@ -54,7 +52,6 @@ where Self { bifrost, schema_registry: SchemaRegistry::new( - metadata_store_client, metadata_writer, service_discovery, subscription_validator, diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 06b86ff76..b0da0b240 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -15,7 +15,7 @@ use std::sync::OnceLock; use enum_map::EnumMap; use tracing::instrument; -use restate_core::{Metadata, MetadataKind, TargetVersion}; +use restate_core::{Metadata, MetadataKind, MetadataWriter, TargetVersion}; use restate_types::logs::metadata::{MaybeSegment, ProviderKind, Segment}; use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber, TailState}; use restate_types::storage::StorageEncode; @@ -25,7 +25,7 @@ use crate::background_appender::BackgroundAppender; use crate::loglet::LogletProvider; use crate::loglet_wrapper::LogletWrapper; use crate::watchdog::WatchdogSender; -use crate::{Error, InputRecord, LogReadStream, Result}; +use crate::{BifrostAdmin, Error, InputRecord, LogReadStream, Result}; /// The strategy to use when bifrost fails to append or when it observes /// a sealed loglet while it's tailing a log. @@ -77,20 +77,20 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_in_memory() -> Self { + pub async fn init_in_memory(metadata_writer: MetadataWriter) -> Self { use crate::providers::memory_loglet; - Self::init_with_factory(memory_loglet::Factory::default()).await + Self::init_with_factory(metadata_writer, memory_loglet::Factory::default()).await } #[cfg(any(test, feature = "test-util"))] - pub async fn init_local() -> Self { + pub async fn init_local(metadata_writer: MetadataWriter) -> Self { use restate_types::config::Configuration; use crate::BifrostService; let config = Configuration::updateable(); - let bifrost_svc = BifrostService::new().enable_local_loglet(&config); + let bifrost_svc = BifrostService::new(metadata_writer).enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -102,10 +102,13 @@ impl Bifrost { } #[cfg(any(test, feature = "test-util"))] - pub async fn init_with_factory(factory: impl crate::loglet::LogletProviderFactory) -> Self { + pub async fn init_with_factory( + metadata_writer: MetadataWriter, + factory: impl crate::loglet::LogletProviderFactory, + ) -> Self { use crate::BifrostService; - let bifrost_svc = BifrostService::new().with_factory(factory); + let bifrost_svc = BifrostService::new(metadata_writer).with_factory(factory); let bifrost = bifrost_svc.handle(); // start bifrost service in the background @@ -116,6 +119,11 @@ impl Bifrost { bifrost } + /// Admin operations of bifrost + pub fn admin(&self) -> BifrostAdmin<'_> { + BifrostAdmin::new(self) + } + /// Appends a single record to a log. The log id must exist, otherwise the /// operation fails with [`Error::UnknownLogId`] /// @@ -302,15 +310,17 @@ static_assertions::assert_impl_all!(Bifrost: Send, Sync, Clone); pub struct BifrostInner { #[allow(unused)] watchdog: WatchdogSender, + pub(crate) metadata_writer: MetadataWriter, // Initialized after BifrostService::start completes. pub(crate) providers: OnceLock>>>, shutting_down: AtomicBool, } impl BifrostInner { - pub fn new(watchdog: WatchdogSender) -> Self { + pub fn new(watchdog: WatchdogSender, metadata_writer: MetadataWriter) -> Self { Self { watchdog, + metadata_writer, providers: Default::default(), shutting_down: AtomicBool::new(false), } @@ -558,13 +568,12 @@ mod tests { use restate_types::{Version, Versioned}; use crate::providers::memory_loglet::{self}; - use crate::BifrostAdmin; #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { let num_partitions = 5; - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, num_partitions, @@ -572,7 +581,7 @@ mod tests { .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let clean_bifrost_clone = bifrost.clone(); @@ -637,12 +646,12 @@ mod tests { #[restate_core::test(start_paused = true)] async fn test_lazy_initialization() -> googletest::Result<()> { - let _ = TestCoreEnv::create_with_single_node(1, 1).await; + let env = TestCoreEnv::create_with_single_node(1, 1).await; let delay = Duration::from_secs(5); // This memory provider adds a delay to its loglet initialization, we want // to ensure that appends do not fail while waiting for the loglet; let factory = memory_loglet::Factory::with_init_delay(delay); - let bifrost = Bifrost::init_with_factory(factory).await; + let bifrost = Bifrost::init_with_factory(env.metadata_writer, factory).await; let start = tokio::time::Instant::now(); let lsn = bifrost @@ -664,12 +673,7 @@ mod tests { .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; assert_eq!(Lsn::OLDEST, bifrost.find_tail(LOG_ID).await?.offset()); @@ -681,7 +685,7 @@ mod tests { appender.append("").await?; } - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; let tail = bifrost.find_tail(LOG_ID).await?; assert_eq!(tail.offset(), Lsn::from(11)); @@ -703,7 +707,7 @@ mod tests { } // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::MAX).await?; + bifrost.admin().trim(LOG_ID, Lsn::MAX).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); let new_trim_point = bifrost.get_trim_point(LOG_ID).await?; @@ -737,12 +741,7 @@ mod tests { )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer).await; let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; // Lsns [1..5] @@ -765,7 +764,8 @@ mod tests { .unwrap(); // seal the segment - bifrost_admin + bifrost + .admin() .seal(LOG_ID, segment_1.segment_index()) .await?; @@ -925,12 +925,7 @@ mod tests { .build() .await; RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_local().await; - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); + let bifrost = Bifrost::init_local(node_env.metadata_writer).await; // create an appender let stop_signal = Arc::new(AtomicBool::default()); @@ -976,7 +971,7 @@ mod tests { } // seal and don't extend the chain. - let _ = bifrost_admin.seal(LOG_ID, SegmentIndex::from(0)).await?; + let _ = bifrost.admin().seal(LOG_ID, SegmentIndex::from(0)).await?; // appends should stall! tokio::time::sleep(Duration::from_millis(100)).await; @@ -998,7 +993,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(500)).await; // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::Local); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, diff --git a/crates/bifrost/src/bifrost_admin.rs b/crates/bifrost/src/bifrost_admin.rs index 6e84d52fb..b447f3652 100644 --- a/crates/bifrost/src/bifrost_admin.rs +++ b/crates/bifrost/src/bifrost_admin.rs @@ -8,14 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::ops::Deref; use std::sync::Arc; -use restate_core::metadata_store::retry_on_network_error; use tracing::{debug, info, instrument}; -use restate_core::{Metadata, MetadataKind, MetadataWriter}; -use restate_metadata_store::MetadataStoreClient; +use restate_core::metadata_store::retry_on_network_error; +use restate_core::{Metadata, MetadataKind}; use restate_types::config::Configuration; use restate_types::logs::metadata::{Chain, LogletParams, Logs, ProviderKind, SegmentIndex}; use restate_types::logs::{LogId, Lsn, TailState}; @@ -30,21 +28,6 @@ use crate::{Bifrost, Error, Result}; #[derive(Clone, Copy)] pub struct BifrostAdmin<'a> { bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, -} - -impl<'a> AsRef for BifrostAdmin<'a> { - fn as_ref(&self) -> &Bifrost { - self.bifrost - } -} - -impl<'a> Deref for BifrostAdmin<'a> { - type Target = Bifrost; - fn deref(&self) -> &Self::Target { - self.bifrost - } } #[derive(Debug)] @@ -56,16 +39,8 @@ pub struct SealedSegment { } impl<'a> BifrostAdmin<'a> { - pub fn new( - bifrost: &'a Bifrost, - metadata_writer: &'a MetadataWriter, - metadata_store_client: &'a MetadataStoreClient, - ) -> Self { - Self { - bifrost, - metadata_writer, - metadata_store_client, - } + pub fn new(bifrost: &'a Bifrost) -> Self { + Self { bifrost } } /// Trim the log prefix up to and including the `trim_point`. /// Set `trim_point` to the value returned from `find_tail()` or `Lsn::MAX` to @@ -179,7 +154,7 @@ impl<'a> BifrostAdmin<'a> { } pub async fn writeable_loglet(&self, log_id: LogId) -> Result { - self.inner.writeable_loglet(log_id).await + self.bifrost.inner.writeable_loglet(log_id).await } #[instrument(level = "debug", skip(self), err)] @@ -247,9 +222,11 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write(BIFROST_CONFIG_KEY.clone(), |logs: Option| { let logs = logs.ok_or(Error::UnknownLogId(log_id))?; let mut builder = logs.into_builder(); @@ -268,13 +245,16 @@ impl<'a> BifrostAdmin<'a> { .append_segment(base_lsn, provider, params.clone()) .map_err(AdminError::from)?; Ok(builder.build()) - }, - ) + }) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -292,25 +272,33 @@ impl<'a> BifrostAdmin<'a> { .network_error_retry_policy .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client.read_modify_write::<_, _, Error>( - BIFROST_CONFIG_KEY.clone(), - |logs: Option| { - // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, - // provisioning the empty logs metadata is not our responsibility. - let logs = logs.ok_or(Error::LogsMetadataNotProvisioned)?; - - let mut builder = logs.into_builder(); - builder - .add_log(log_id, Chain::new(provider, params.clone())) - .map_err(AdminError::from)?; - Ok(builder.build()) - }, - ) + self.bifrost + .inner + .metadata_writer + .metadata_store_client() + .read_modify_write::<_, _, Error>( + BIFROST_CONFIG_KEY.clone(), + |logs: Option| { + // We assume that we'll always see a value set in metadata for BIFROST_CONFIG_KEY, + // provisioning the empty logs metadata is not our responsibility. + let logs = logs.ok_or(Error::LogsMetadataNotProvisioned)?; + + let mut builder = logs.into_builder(); + builder + .add_log(log_id, Chain::new(provider, params.clone())) + .map_err(AdminError::from)?; + Ok(builder.build()) + }, + ) }) .await .map_err(|e| e.transpose())?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } @@ -323,14 +311,21 @@ impl<'a> BifrostAdmin<'a> { .clone(); let logs = retry_on_network_error(retry_policy, || { - self.metadata_store_client + self.bifrost + .inner + .metadata_writer + .metadata_store_client() .get_or_insert(BIFROST_CONFIG_KEY.clone(), || { Logs::from_configuration(&Configuration::pinned()) }) }) .await?; - self.metadata_writer.update(Arc::new(logs)).await?; + self.bifrost + .inner + .metadata_writer + .update(Arc::new(logs)) + .await?; Ok(()) } } diff --git a/crates/bifrost/src/loglet/provider.rs b/crates/bifrost/src/loglet/provider.rs index dbf4cf806..3d84d6c49 100644 --- a/crates/bifrost/src/loglet/provider.rs +++ b/crates/bifrost/src/loglet/provider.rs @@ -44,7 +44,8 @@ pub trait LogletProvider: Send + Sync { /// This will not perform any updates, it just statically generates a valid /// configuration for a potentially new loglet. /// - /// if `chain` is None, this means we no chain exists already for this log. + /// if `chain` is None, the provider should assume that no chain exists already + /// for this log. fn propose_new_loglet_params( &self, log_id: LogId, diff --git a/crates/bifrost/src/read_stream.rs b/crates/bifrost/src/read_stream.rs index 8ac81c3ee..11c32f212 100644 --- a/crates/bifrost/src/read_stream.rs +++ b/crates/bifrost/src/read_stream.rs @@ -454,14 +454,14 @@ mod tests { use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::Versioned; - use crate::{BifrostAdmin, BifrostService, ErrorRecoveryStrategy}; + use crate::{BifrostService, ErrorRecoveryStrategy}; #[restate_core::test(flavor = "multi_thread", worker_threads = 2)] #[traced_test] async fn test_readstream_one_loglet() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); - let _ = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_provider_kind(ProviderKind::Local) .build() .await; @@ -471,7 +471,7 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = BifrostService::new(env.metadata_writer).enable_local_loglet(&config); let bifrost = svc.handle(); svc.start().await.expect("loglet must start"); @@ -548,14 +548,10 @@ mod tests { let config = Live::from_value(Configuration::default()); RocksDbManager::init(Constant::new(CommonOptions::default())); - let svc = BifrostService::new().enable_local_loglet(&config); + let svc = + BifrostService::new(node_env.metadata_writer.clone()).enable_local_loglet(&config); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -569,7 +565,7 @@ mod tests { } // [1..5] trimmed. trim_point = 5 - bifrost_admin.trim(LOG_ID, Lsn::from(5)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(5)).await?; assert_eq!(Lsn::from(11), bifrost.find_tail(LOG_ID).await?.offset()); assert_eq!(Lsn::from(5), bifrost.get_trim_point(LOG_ID).await?); @@ -590,7 +586,7 @@ mod tests { let tail = bifrost.find_tail(LOG_ID).await?.offset(); // trimming beyond the release point will fall back to the release point - bifrost_admin.trim(LOG_ID, Lsn::from(u64::MAX)).await?; + bifrost.admin().trim(LOG_ID, Lsn::from(u64::MAX)).await?; let trim_point = bifrost.get_trim_point(LOG_ID).await?; assert_eq!(Lsn::from(10), bifrost.get_trim_point(LOG_ID).await?); // trim point becomes the point before the next slot available for writes (aka. the @@ -643,7 +639,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer.clone()) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); @@ -799,15 +795,10 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); - let bifrost_admin = BifrostAdmin::new( - &bifrost, - &node_env.metadata_writer, - &node_env.metadata_store_client, - ); svc.start().await.expect("loglet must start"); let mut appender = bifrost.create_appender(LOG_ID, ErrorRecoveryStrategy::Wait)?; @@ -825,7 +816,8 @@ mod tests { // seal the loglet and extend with an in-memory one let new_segment_params = new_single_node_loglet_params(ProviderKind::InMemory); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( LOG_ID, None, @@ -917,7 +909,7 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); // enable both in-memory and local loglet types - let svc = BifrostService::new() + let svc = BifrostService::new(node_env.metadata_writer) .enable_local_loglet(&config) .enable_in_memory_loglet(); let bifrost = svc.handle(); diff --git a/crates/bifrost/src/service.rs b/crates/bifrost/src/service.rs index 3222cc5c2..ec6718aa7 100644 --- a/crates/bifrost/src/service.rs +++ b/crates/bifrost/src/service.rs @@ -15,7 +15,9 @@ use anyhow::Context; use enum_map::EnumMap; use tracing::{debug, error, trace}; -use restate_core::{cancellation_watcher, TaskCenter, TaskCenterFutureExt, TaskKind}; +use restate_core::{ + cancellation_watcher, MetadataWriter, TaskCenter, TaskCenterFutureExt, TaskKind, +}; use restate_types::config::Configuration; use restate_types::live::Live; use restate_types::logs::metadata::ProviderKind; @@ -34,16 +36,10 @@ pub struct BifrostService { factories: HashMap>, } -impl Default for BifrostService { - fn default() -> Self { - Self::new() - } -} - impl BifrostService { - pub fn new() -> Self { + pub fn new(metadata_writer: MetadataWriter) -> Self { let (watchdog_sender, watchdog_receiver) = tokio::sync::mpsc::unbounded_channel(); - let inner = Arc::new(BifrostInner::new(watchdog_sender.clone())); + let inner = Arc::new(BifrostInner::new(watchdog_sender.clone(), metadata_writer)); let bifrost = Bifrost::new(inner.clone()); let watchdog = Watchdog::new(inner.clone(), watchdog_sender, watchdog_receiver); Self { diff --git a/crates/core/src/metadata.rs b/crates/core/src/metadata.rs index 2244c0ef3..08fde62e1 100644 --- a/crates/core/src/metadata.rs +++ b/crates/core/src/metadata.rs @@ -29,7 +29,7 @@ use restate_types::schema::Schema; use restate_types::{GenerationalNodeId, Version, Versioned}; use crate::metadata::manager::Command; -use crate::metadata_store::ReadError; +use crate::metadata_store::{MetadataStoreClient, ReadError}; use crate::network::WeakConnection; use crate::{ShutdownError, TaskCenter, TaskId, TaskKind}; @@ -335,6 +335,7 @@ struct MetadataInner { /// so it's safe to call update_* without checking the current version. #[derive(Clone)] pub struct MetadataWriter { + metadata_store_client: MetadataStoreClient, sender: manager::CommandSender, /// strictly used to set my node id. Do not use this to update metadata /// directly to avoid race conditions. @@ -342,8 +343,20 @@ pub struct MetadataWriter { } impl MetadataWriter { - fn new(sender: manager::CommandSender, inner: Arc) -> Self { - Self { sender, inner } + fn new( + sender: manager::CommandSender, + metadata_store_client: MetadataStoreClient, + inner: Arc, + ) -> Self { + Self { + metadata_store_client, + sender, + inner, + } + } + + pub fn metadata_store_client(&self) -> &MetadataStoreClient { + &self.metadata_store_client } // Returns when the nodes configuration update is performed. diff --git a/crates/core/src/metadata/manager.rs b/crates/core/src/metadata/manager.rs index 3c9773f99..baf248d3d 100644 --- a/crates/core/src/metadata/manager.rs +++ b/crates/core/src/metadata/manager.rs @@ -258,7 +258,11 @@ impl MetadataManager { } pub fn writer(&self) -> MetadataWriter { - MetadataWriter::new(self.metadata.sender.clone(), self.metadata.inner.clone()) + MetadataWriter::new( + self.metadata.sender.clone(), + self.metadata_store_client.clone(), + self.metadata.inner.clone(), + ) } /// Start and wait for shutdown signal. diff --git a/crates/core/src/metadata_store.rs b/crates/core/src/metadata_store.rs index 3542226f5..891e19d6d 100644 --- a/crates/core/src/metadata_store.rs +++ b/crates/core/src/metadata_store.rs @@ -105,7 +105,6 @@ pub trait MetadataStore { /// Metadata store client which allows storing [`Versioned`] values into a [`MetadataStore`]. #[derive(Clone)] pub struct MetadataStoreClient { - // premature optimization? Maybe introduce trait object once we have multiple implementations? inner: Arc, backoff_policy: Option, } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..7dbc273a0 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -184,7 +184,8 @@ impl Node { record_cache.clone(), &mut router_builder, ); - let bifrost_svc = BifrostService::new().enable_local_loglet(&updateable_config); + let bifrost_svc = + BifrostService::new(metadata_manager.writer()).enable_local_loglet(&updateable_config); #[cfg(feature = "replicated-loglet")] let bifrost_svc = bifrost_svc.with_factory(replicated_loglet_factory); @@ -271,7 +272,6 @@ impl Node { metadata_manager.writer(), &mut server_builder, &mut router_builder, - metadata_store_client.clone(), worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eec..89cedd8af 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -14,7 +14,6 @@ use codederror::CodedError; use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; -use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::MessageRouterBuilder; use restate_core::network::NetworkServerBuilder; use restate_core::network::Networking; @@ -70,7 +69,6 @@ impl AdminRole { metadata_writer: MetadataWriter, server_builder: &mut NetworkServerBuilder, router_builder: &mut MessageRouterBuilder, - metadata_store_client: MetadataStoreClient, local_query_context: Option, ) -> Result { health_status.update(AdminStatus::StartingUp); @@ -104,7 +102,6 @@ impl AdminRole { let admin = AdminService::new( metadata_writer.clone(), - metadata_store_client.clone(), bifrost.clone(), config.ingress.clone(), service_discovery, @@ -121,7 +118,6 @@ impl AdminRole { router_builder, server_builder, metadata_writer, - metadata_store_client, )) } else { None diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 884a22866..ecb513c56 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -213,14 +213,14 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let _env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 105e5a6a4..6d8dc4842 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -637,12 +637,12 @@ mod tests { #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let _env = TestCoreEnv::create_with_single_node(0, 0).await; + let env = TestCoreEnv::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; let partition_store_manager = PartitionStoreManager::create( Constant::new(storage_options.clone()).boxed(), diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index e4ac810d0..6b378b33e 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -628,7 +628,7 @@ mod tests { let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer.clone()).await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index 01a994163..5a394716d 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -1006,7 +1006,8 @@ mod tests { RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost_svc = BifrostService::new(env_builder.metadata_writer.clone()) + .with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); let partition_store_manager = PartitionStoreManager::create( diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 628707454..c11c9a1a0 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -15,7 +15,7 @@ use enumset::{enum_set, EnumSet}; use googletest::internal::test_outcome::TestAssertionFailure; use googletest::IntoTestResult; -use restate_bifrost::{loglet::Loglet, Bifrost, BifrostAdmin}; +use restate_bifrost::{loglet::Loglet, Bifrost}; use restate_core::metadata_store::Precondition; use restate_core::TaskCenter; use restate_core::{metadata_store::MetadataStoreClient, MetadataWriter}; @@ -92,16 +92,6 @@ pub struct TestEnv { pub cluster: StartedCluster, } -impl TestEnv { - pub fn bifrost_admin(&self) -> BifrostAdmin<'_> { - BifrostAdmin::new( - &self.bifrost, - &self.metadata_writer, - &self.metadata_store_client, - ) - } -} - pub async fn run_in_test_env( mut base_config: Configuration, sequencer: GenerationalNodeId, diff --git a/server/tests/replicated_loglet.rs b/server/tests/replicated_loglet.rs index 8fa79c7f2..f3656d2c9 100644 --- a/server/tests/replicated_loglet.rs +++ b/server/tests/replicated_loglet.rs @@ -248,23 +248,13 @@ mod tests { } let mut sealer_handle: JoinHandle> = tokio::task::spawn({ - let (bifrost, metadata_writer, metadata_store_client) = ( - test_env.bifrost.clone(), - test_env.metadata_writer.clone(), - test_env.metadata_store_client.clone() - ); + let bifrost = test_env.bifrost.clone(); async move { let cancellation_token = cancellation_token(); let mut chain = metadata.updateable_logs_metadata().map(|logs| logs.chain(&log_id).expect("a chain to exist")); - let bifrost_admin = restate_bifrost::BifrostAdmin::new( - &bifrost, - &metadata_writer, - &metadata_store_client, - ); - let mut last_loglet_id = None; while !cancellation_token.is_cancelled() { @@ -280,7 +270,8 @@ mod tests { eprintln!("Sealing loglet {} and creating new loglet {}", params.loglet_id, params.loglet_id.next()); params.loglet_id = params.loglet_id.next(); - bifrost_admin + bifrost + .admin() .seal_and_extend_chain( log_id, None, diff --git a/tools/bifrost-benchpress/src/main.rs b/tools/bifrost-benchpress/src/main.rs index 33d5f8d21..f9512f964 100644 --- a/tools/bifrost-benchpress/src/main.rs +++ b/tools/bifrost-benchpress/src/main.rs @@ -176,7 +176,7 @@ fn spawn_environment(config: Live, num_logs: u16) -> (task_center metadata_writer.submit(Arc::new(logs)); spawn_metadata_manager(metadata_manager).expect("metadata manager starts"); - let bifrost_svc = BifrostService::new() + let bifrost_svc = BifrostService::new(metadata_writer) .enable_in_memory_loglet() .enable_local_loglet(&config); let bifrost = bifrost_svc.handle(); diff --git a/tools/restatectl/src/commands/log/dump_log.rs b/tools/restatectl/src/commands/log/dump_log.rs index cb4219513..213e328fa 100644 --- a/tools/restatectl/src/commands/log/dump_log.rs +++ b/tools/restatectl/src/commands/log/dump_log.rs @@ -86,6 +86,7 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { let metadata_manager = MetadataManager::new(metadata_builder, metadata_store_client.clone()); + let metadata_writer = metadata_manager.writer(); let mut router_builder = MessageRouterBuilder::default(); metadata_manager.register_in_message_router(&mut router_builder); @@ -95,7 +96,8 @@ async fn dump_log(opts: &DumpLogOpts) -> anyhow::Result<()> { metadata_manager.run(), )?; - let bifrost_svc = BifrostService::new().enable_local_loglet(&Configuration::updateable()); + let bifrost_svc = + BifrostService::new(metadata_writer).enable_local_loglet(&Configuration::updateable()); let bifrost = bifrost_svc.handle(); // Ensures bifrost has initial metadata synced up before starting the worker. diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index ee702beec..e556aa82c 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -103,11 +103,10 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { // We start the Meta service, then download the openapi schema generated let node_env = TestCoreEnv::create_with_single_node(1, 1).await; - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory(node_env.metadata_writer.clone()).await; let admin_service = AdminService::new( node_env.metadata_writer.clone(), - node_env.metadata_store_client.clone(), bifrost, Mock, ServiceDiscovery::new(