diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 9776e3154..6ac7921eb 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -60,13 +60,12 @@ where /// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate. pub fn start( self, - task_center: TaskCenter, name: &'static str, partition_id: Option, ) -> Result, ShutdownError> { let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity); - let handle = task_center.spawn_unmanaged( + let handle = TaskCenter::current().spawn_unmanaged( restate_core::TaskKind::BifrostAppender, name, partition_id, diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index c4664e6ca..feec06619 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use bytes::{Bytes, BytesMut}; use restate_bifrost::Bifrost; -use restate_core::{metadata, ShutdownError}; +use restate_core::{Metadata, ShutdownError}; use restate_storage_api::deduplication_table::DedupInformation; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; use restate_types::invocation::{ @@ -237,7 +237,9 @@ pub async fn append_envelope_to_bifrost( ) -> Result<(LogId, Lsn), Error> { let partition_id = { // make sure we drop pinned partition table before awaiting - let partition_table = metadata().wait_for_partition_table(Version::MIN).await?; + let partition_table = Metadata::current() + .wait_for_partition_table(Version::MIN) + .await?; partition_table.find_partition_id(envelope.partition_key())? }; diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 416fb4784..7d47536d7 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -137,10 +137,8 @@ impl Worker { .await?; let partition_processor_manager = PartitionProcessorManager::new( - task_center(), health_status, updateable_config.clone(), - metadata.clone(), metadata_store_client, partition_store_manager.clone(), router_builder, diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 0d4485df8..c8ca2c4c8 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -18,14 +18,13 @@ use tokio::time::MissedTickBehavior; use tracing::{debug, instrument, warn}; use restate_bifrost::Bifrost; -use restate_core::cancellation_watcher; +use restate_core::{cancellation_watcher, Metadata}; use restate_storage_api::invocation_status_table::{ InvocationStatus, ReadOnlyInvocationStatusTable, }; use restate_types::identifiers::WithPartitionKey; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::invocation::PurgeInvocationRequest; -use restate_types::GenerationalNodeId; use restate_wal_protocol::{ append_envelope_to_bifrost, Command, Destination, Envelope, Header, Source, }; @@ -33,7 +32,6 @@ use restate_wal_protocol::{ pub(super) struct Cleaner { partition_id: PartitionId, leader_epoch: LeaderEpoch, - node_id: GenerationalNodeId, partition_key_range: RangeInclusive, storage: Storage, bifrost: Bifrost, @@ -47,7 +45,6 @@ where pub(super) fn new( partition_id: PartitionId, leader_epoch: LeaderEpoch, - node_id: GenerationalNodeId, storage: Storage, bifrost: Bifrost, partition_key_range: RangeInclusive, @@ -56,7 +53,6 @@ where Self { partition_id, leader_epoch, - node_id, partition_key_range, storage, bifrost, @@ -64,12 +60,11 @@ where } } - #[instrument(skip_all, fields(restate.node = %self.node_id, restate.partition.id = %self.partition_id))] + #[instrument(skip_all, fields(restate.partition.id = %self.partition_id))] pub(super) async fn run(self) -> anyhow::Result<()> { let Self { partition_id, leader_epoch, - node_id, partition_key_range, storage, bifrost, @@ -77,12 +72,13 @@ where } = self; debug!("Running cleaner"); + let my_node_id = Metadata::with_current(|m| m.my_node_id()); let bifrost_envelope_source = Source::Processor { partition_id, partition_key: None, leader_epoch, - node_id: node_id.as_plain(), - generational_node_id: Some(node_id), + node_id: my_node_id.as_plain(), + generational_node_id: Some(my_node_id), }; let mut interval = tokio::time::interval(cleanup_interval); @@ -172,7 +168,7 @@ mod tests { use futures::{stream, Stream}; use googletest::prelude::*; - use restate_core::{TaskCenter, TaskCenterFutureExt, TaskKind, TestCoreEnvBuilder}; + use restate_core::{Metadata, TaskCenter, TaskKind, TestCoreEnvBuilder2}; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, }; @@ -215,97 +211,92 @@ mod tests { } // Start paused makes sure the timer is immediately fired - #[test(tokio::test(start_paused = true))] + #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let env = TestCoreEnvBuilder::with_incoming_only_connector() + let _env = TestCoreEnvBuilder2::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - async { - let bifrost = Bifrost::init_in_memory().await; + let bifrost = Bifrost::init_in_memory().await; - let expired_invocation = - InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); - let not_expired_invocation_1 = - InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); - let not_expired_invocation_2 = - InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); - let not_completed_invocation = - InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); + let expired_invocation = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); + let not_expired_invocation_1 = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); + let not_expired_invocation_2 = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); + let not_completed_invocation = + InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); - let mock_storage = MockInvocationStatusReader(vec![ - ( - expired_invocation, - InvocationStatus::Completed(CompletedInvocation { - completion_retention_duration: Duration::ZERO, - ..CompletedInvocation::mock_neo() - }), - ), - ( - not_expired_invocation_1, - InvocationStatus::Completed(CompletedInvocation { - completion_retention_duration: Duration::MAX, - ..CompletedInvocation::mock_neo() - }), - ), - ( - not_expired_invocation_2, - // Old status invocations are still processed with the cleanup timer in the PP - InvocationStatus::Completed(CompletedInvocation::mock_old()), - ), - ( - not_completed_invocation, - InvocationStatus::Invoked(InFlightInvocationMetadata::mock()), - ), - ]); + let mock_storage = MockInvocationStatusReader(vec![ + ( + expired_invocation, + InvocationStatus::Completed(CompletedInvocation { + completion_retention_duration: Duration::ZERO, + ..CompletedInvocation::mock_neo() + }), + ), + ( + not_expired_invocation_1, + InvocationStatus::Completed(CompletedInvocation { + completion_retention_duration: Duration::MAX, + ..CompletedInvocation::mock_neo() + }), + ), + ( + not_expired_invocation_2, + // Old status invocations are still processed with the cleanup timer in the PP + InvocationStatus::Completed(CompletedInvocation::mock_old()), + ), + ( + not_completed_invocation, + InvocationStatus::Invoked(InFlightInvocationMetadata::mock()), + ), + ]); - TaskCenter::current() - .spawn( - TaskKind::Cleaner, - "cleaner", - Some(PartitionId::MIN), - Cleaner::new( - PartitionId::MIN, - LeaderEpoch::INITIAL, - GenerationalNodeId::new(1, 1), - mock_storage, - bifrost.clone(), - RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), - Duration::from_secs(1), - ) - .run(), + TaskCenter::current() + .spawn( + TaskKind::Cleaner, + "cleaner", + Some(PartitionId::MIN), + Cleaner::new( + PartitionId::MIN, + LeaderEpoch::INITIAL, + mock_storage, + bifrost.clone(), + RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), + Duration::from_secs(1), ) - .unwrap(); + .run(), + ) + .unwrap(); - // By yielding once we let the cleaner task run, and perform the cleanup - tokio::task::yield_now().await; + // By yielding once we let the cleaner task run, and perform the cleanup + tokio::task::yield_now().await; - // All the invocation ids were created with same partition keys, hence same partition id. - let partition_id = env - .metadata - .partition_table_snapshot() + // All the invocation ids were created with same partition keys, hence same partition id. + let partition_id = Metadata::with_current(|m| { + m.partition_table_snapshot() .find_partition_id(expired_invocation.partition_key()) - .unwrap(); + }) + .unwrap(); - let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap(); - let bifrost_message = log_entries - .remove(0) - .try_decode::() - .unwrap() - .unwrap(); + let mut log_entries = bifrost.read_all(partition_id.into()).await.unwrap(); + let bifrost_message = log_entries + .remove(0) + .try_decode::() + .unwrap() + .unwrap(); - assert_that!( - bifrost_message.command, - pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest { - invocation_id: eq(expired_invocation) - }))) - ); - assert_that!(log_entries, empty()); - } - .in_tc(&env.tc) - .await; + assert_that!( + bifrost_message.command, + pat!(Command::PurgeInvocation(pat!(PurgeInvocationRequest { + invocation_id: eq(expired_invocation) + }))) + ); + assert_that!(log_entries, empty()); } } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index fff443ea6..efba7ed07 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -52,7 +52,6 @@ pub struct LeaderState { // only needed for proposing TruncateOutbox to ourselves own_partition_key: PartitionKey, action_effects_counter: Counter, - task_center: TaskCenter, pub shuffle_hint_tx: HintSender, shuffle_task_id: TaskId, @@ -74,7 +73,6 @@ pub struct LeaderState { impl LeaderState { #[allow(clippy::too_many_arguments)] pub fn new( - task_center: TaskCenter, partition_id: PartitionId, leader_epoch: LeaderEpoch, own_partition_key: PartitionKey, @@ -87,7 +85,6 @@ impl LeaderState { shuffle_rx: tokio::sync::mpsc::Receiver, ) -> Self { LeaderState { - task_center, partition_id, leader_epoch, own_partition_key, @@ -163,8 +160,10 @@ impl LeaderState { InvokerStorageReader, >, ) { - let shuffle_handle = OptionFuture::from(self.task_center.cancel_task(self.shuffle_task_id)); - let cleaner_handle = OptionFuture::from(self.task_center.cancel_task(self.cleaner_task_id)); + let shuffle_handle = + OptionFuture::from(TaskCenter::current().cancel_task(self.shuffle_task_id)); + let cleaner_handle = + OptionFuture::from(TaskCenter::current().cancel_task(self.cleaner_task_id)); // It's ok to not check the abort_result because either it succeeded or the invoker // is not running. If the invoker is not running, and we are not shutting down, then @@ -189,7 +188,6 @@ impl LeaderState { "Failing rpc because I lost leadership", ); respond_to_rpc( - &self.task_center, reciprocal.prepare(Err(PartitionProcessorRpcError::LostLeadership( self.partition_id, ))), @@ -265,7 +263,6 @@ impl LeaderState { let old_reciprocal = o.remove(); trace!(%request_id, "Replacing rpc with newer request"); respond_to_rpc( - &self.task_center, old_reciprocal.prepare(Err(PartitionProcessorRpcError::Internal( "retried".to_string(), ))), @@ -276,7 +273,6 @@ impl LeaderState { // In this case, no one proposed this command yet, let's try to propose it if let Err(e) = self.self_proposer.propose(partition_key, cmd).await { respond_to_rpc( - &self.task_center, reciprocal .prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))), ); @@ -299,15 +295,11 @@ impl LeaderState { .await { Ok(commit_token) => { - self.awaiting_rpc_self_propose.push(SelfAppendFuture::new( - self.task_center.clone(), - commit_token, - reciprocal, - )); + self.awaiting_rpc_self_propose + .push(SelfAppendFuture::new(commit_token, reciprocal)); } Err(e) => { respond_to_rpc( - &self.task_center, reciprocal.prepare(Err(PartitionProcessorRpcError::Internal(e.to_string()))), ); } @@ -391,7 +383,6 @@ impl LeaderState { } => { if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) { respond_to_rpc( - &self.task_center, response_tx.prepare(Ok(PartitionProcessorRpcResponse::Output( InvocationOutput { request_id, @@ -411,15 +402,12 @@ impl LeaderState { .. } => { if let Some(response_tx) = self.awaiting_rpc_actions.remove(&request_id) { - respond_to_rpc( - &self.task_center, - response_tx.prepare(Ok(PartitionProcessorRpcResponse::Submitted( - SubmittedInvocationNotification { - request_id, - is_new_invocation, - }, - ))), - ); + respond_to_rpc(response_tx.prepare(Ok( + PartitionProcessorRpcResponse::Submitted(SubmittedInvocationNotification { + request_id, + is_new_invocation, + }), + ))); } } Action::ScheduleInvocationStatusCleanup { @@ -436,19 +424,16 @@ impl LeaderState { } struct SelfAppendFuture { - task_center: TaskCenter, commit_token: CommitToken, response: Option>>, } impl SelfAppendFuture { fn new( - task_center: TaskCenter, commit_token: CommitToken, response: Reciprocal>, ) -> Self { Self { - task_center, commit_token, response: Some(response), } @@ -456,19 +441,15 @@ impl SelfAppendFuture { fn fail_with_internal(&mut self) { if let Some(reciprocal) = self.response.take() { - respond_to_rpc( - &self.task_center, - reciprocal.prepare(Err(PartitionProcessorRpcError::Internal( - "error when proposing to bifrost".to_string(), - ))), - ); + respond_to_rpc(reciprocal.prepare(Err(PartitionProcessorRpcError::Internal( + "error when proposing to bifrost".to_string(), + )))); } } fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) { if let Some(reciprocal) = self.response.take() { respond_to_rpc( - &self.task_center, reciprocal.prepare(Err(PartitionProcessorRpcError::LostLeadership( this_partition_id, ))), @@ -478,10 +459,7 @@ impl SelfAppendFuture { fn succeed_with_appended(&mut self) { if let Some(reciprocal) = self.response.take() { - respond_to_rpc( - &self.task_center, - reciprocal.prepare(Ok(PartitionProcessorRpcResponse::Appended)), - ); + respond_to_rpc(reciprocal.prepare(Ok(PartitionProcessorRpcResponse::Appended))); } } } diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 28f5af663..f25176e63 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -23,7 +23,7 @@ use tracing::{debug, instrument, warn}; use restate_bifrost::Bifrost; use restate_core::network::Reciprocal; -use restate_core::{metadata, ShutdownError, TaskCenter, TaskKind}; +use restate_core::{metadata, my_node_id, ShutdownError, TaskCenter, TaskKind}; use restate_errors::NotRunningError; use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; @@ -40,7 +40,6 @@ use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; use restate_types::storage::StorageEncodeError; -use restate_types::GenerationalNodeId; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::Command; @@ -132,19 +131,16 @@ impl State { } pub struct PartitionProcessorMetadata { - node_id: GenerationalNodeId, partition_id: PartitionId, partition_key_range: RangeInclusive, } impl PartitionProcessorMetadata { pub const fn new( - node_id: GenerationalNodeId, partition_id: PartitionId, partition_key_range: RangeInclusive, ) -> Self { Self { - node_id, partition_id, partition_key_range, } @@ -161,7 +157,6 @@ pub(crate) struct LeadershipState { channel_size: usize, invoker_tx: I, bifrost: Bifrost, - task_center: TaskCenter, } impl LeadershipState @@ -170,7 +165,6 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - task_center: TaskCenter, partition_processor_metadata: PartitionProcessorMetadata, num_timers_in_memory_limit: Option, cleanup_interval: Duration, @@ -180,7 +174,6 @@ where last_seen_leader_epoch: Option, ) -> Self { Self { - task_center, state: State::Follower, partition_processor_metadata, num_timers_in_memory_limit, @@ -221,7 +214,7 @@ where let announce_leader = Command::AnnounceLeader(AnnounceLeader { // todo: Still need to write generational id for supporting rolling back, can be removed // with the next release. - node_id: Some(self.partition_processor_metadata.node_id), + node_id: Some(my_node_id()), leader_epoch, partition_key_range: Some( self.partition_processor_metadata @@ -342,7 +335,6 @@ where ShuffleMetadata::new( self.partition_processor_metadata.partition_id, *leader_epoch, - self.partition_processor_metadata.node_id, ), OutboxReader::from(partition_store.clone()), shuffle_tx, @@ -358,7 +350,6 @@ where let cleaner = Cleaner::new( self.partition_processor_metadata.partition_id, *leader_epoch, - self.partition_processor_metadata.node_id, partition_store.clone(), self.bifrost.clone(), self.partition_processor_metadata @@ -371,7 +362,6 @@ where TaskCenter::spawn_child(TaskKind::Cleaner, "cleaner", cleaner.run())?; self.state = State::Leader(LeaderState::new( - self.task_center.clone(), self.partition_processor_metadata.partition_id, *leader_epoch, *self @@ -515,7 +505,6 @@ where State::Follower | State::Candidate { .. } => { // Just fail the rpc respond_to_rpc( - &self.task_center, reciprocal.prepare(Err(PartitionProcessorRpcError::NotLeader( self.partition_processor_metadata.partition_id, ))), @@ -537,12 +526,11 @@ where reciprocal: Reciprocal>, ) { match &mut self.state { - State::Follower | State::Candidate { .. } => respond_to_rpc( - &self.task_center, - reciprocal.prepare(Err(PartitionProcessorRpcError::NotLeader( + State::Follower | State::Candidate { .. } => respond_to_rpc(reciprocal.prepare(Err( + PartitionProcessorRpcError::NotLeader( self.partition_processor_metadata.partition_id, - ))), - ), + ), + ))), State::Leader(leader_state) => { leader_state .self_propose_and_respond_asynchronously(partition_key, cmd, reciprocal) @@ -603,7 +591,7 @@ mod tests { use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata, State}; use assert2::let_assert; use restate_bifrost::Bifrost; - use restate_core::{task_center, TestCoreEnv}; + use restate_core::{TaskCenter, TestCoreEnv2}; use restate_invoker_api::test_util::MockInvokerHandle; use restate_partition_store::{OpenMode, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; @@ -623,90 +611,82 @@ mod tests { const NODE_ID: GenerationalNodeId = GenerationalNodeId::new(0, 0); const PARTITION_KEY_RANGE: RangeInclusive = PartitionKey::MIN..=PartitionKey::MAX; const PARTITION_PROCESSOR_METADATA: PartitionProcessorMetadata = - PartitionProcessorMetadata::new(NODE_ID, PARTITION_ID, PARTITION_KEY_RANGE); + PartitionProcessorMetadata::new(PARTITION_ID, PARTITION_KEY_RANGE); - #[test(tokio::test)] + #[test(restate_core::test)] async fn become_leader_then_step_down() -> googletest::Result<()> { - let env = TestCoreEnv::create_with_single_node(0, 0).await; - let tc = env.tc.clone(); + let _env = TestCoreEnv2::create_with_single_node(0, 0).await; let storage_options = StorageOptions::default(); let rocksdb_options = RocksDbOptions::default(); - tc.run_in_scope_sync(|| RocksDbManager::init(Constant::new(CommonOptions::default()))); - - let bifrost = tc - .run_in_scope("init bifrost", None, Bifrost::init_in_memory()) - .await; - - tc.run_in_scope("test", None, async { - let partition_store_manager = PartitionStoreManager::create( - Constant::new(storage_options.clone()).boxed(), - Constant::new(rocksdb_options.clone()).boxed(), - &[(PARTITION_ID, PARTITION_KEY_RANGE)], - ) - .await?; + RocksDbManager::init(Constant::new(CommonOptions::default())); + let bifrost = Bifrost::init_in_memory().await; - let invoker_tx = MockInvokerHandle::default(); - let mut state = LeadershipState::new( - task_center(), - PARTITION_PROCESSOR_METADATA, - None, - Duration::from_secs(60 * 60), - 42, - invoker_tx, - bifrost.clone(), - None, - ); + let partition_store_manager = PartitionStoreManager::create( + Constant::new(storage_options.clone()).boxed(), + Constant::new(rocksdb_options.clone()).boxed(), + &[(PARTITION_ID, PARTITION_KEY_RANGE)], + ) + .await?; - assert!(matches!(state.state, State::Follower)); + let invoker_tx = MockInvokerHandle::default(); + let mut state = LeadershipState::new( + PARTITION_PROCESSOR_METADATA, + None, + Duration::from_secs(60 * 60), + 42, + invoker_tx, + bifrost.clone(), + None, + ); - let leader_epoch = LeaderEpoch::from(1); - state.run_for_leader(leader_epoch).await?; + assert!(matches!(state.state, State::Follower)); - assert!(matches!(state.state, State::Candidate { .. })); + let leader_epoch = LeaderEpoch::from(1); + state.run_for_leader(leader_epoch).await?; - let record = bifrost - .create_reader(PARTITION_ID.into(), KeyFilter::Any, Lsn::OLDEST, Lsn::MAX) - .expect("valid reader") - .next() - .await - .unwrap()?; + assert!(matches!(state.state, State::Candidate { .. })); - let envelope = record.try_decode::().unwrap()?; - - let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command); - assert_eq!( - announce_leader, - AnnounceLeader { - node_id: Some(NODE_ID), - leader_epoch, - partition_key_range: Some(PARTITION_KEY_RANGE), - } - ); + let record = bifrost + .create_reader(PARTITION_ID.into(), KeyFilter::Any, Lsn::OLDEST, Lsn::MAX) + .expect("valid reader") + .next() + .await + .unwrap()?; - let mut partition_store = partition_store_manager - .open_partition_store( - PARTITION_ID, - PARTITION_KEY_RANGE, - OpenMode::CreateIfMissing, - &rocksdb_options, - ) - .await?; - state - .on_announce_leader(announce_leader, &mut partition_store) - .await?; + let envelope = record.try_decode::().unwrap()?; - assert!(matches!(state.state, State::Leader(_))); + let_assert!(Command::AnnounceLeader(announce_leader) = envelope.command); + assert_eq!( + announce_leader, + AnnounceLeader { + node_id: Some(NODE_ID), + leader_epoch, + partition_key_range: Some(PARTITION_KEY_RANGE), + } + ); + + let mut partition_store = partition_store_manager + .open_partition_store( + PARTITION_ID, + PARTITION_KEY_RANGE, + OpenMode::CreateIfMissing, + &rocksdb_options, + ) + .await?; + state + .on_announce_leader(announce_leader, &mut partition_store) + .await?; - state.step_down().await; + assert!(matches!(state.state, State::Leader(_))); - assert!(matches!(state.state, State::Follower)); + state.step_down().await; - googletest::Result::Ok(()) - }) - .await?; + assert!(matches!(state.state, State::Follower)); - tc.shutdown_node("test_completed", 0).await; + TaskCenter::current() + .shutdown_node("test_completed", 0) + .await; RocksDbManager::get().shutdown().await; Ok(()) } diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index 883f65b2d..91e24512a 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -11,7 +11,7 @@ use crate::partition::leadership::Error; use futures::never::Never; use restate_bifrost::{Bifrost, CommitToken}; -use restate_core::{task_center, Metadata}; +use restate_core::Metadata; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::logs::LogId; @@ -49,7 +49,7 @@ impl SelfProposer { BIFROST_QUEUE_SIZE, MAX_BIFROST_APPEND_BATCH, )? - .start(task_center(), "self-appender", Some(partition_id))?; + .start("self-appender", Some(partition_id))?; Ok(Self { partition_id, diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index a5448e7e9..38d5aafa1 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -44,6 +44,7 @@ use restate_types::config::WorkerOptions; use restate_types::identifiers::{ LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; +use restate_types::invocation; use restate_types::invocation::{ AttachInvocationRequest, InvocationQuery, InvocationTarget, InvocationTargetType, ResponseResult, ServiceInvocation, ServiceInvocationResponseSink, SubmitNotificationSink, @@ -58,7 +59,6 @@ use restate_types::net::partition_processor::{ PartitionProcessorRpcRequestInner, PartitionProcessorRpcResponse, }; use restate_types::time::MillisSinceEpoch; -use restate_types::{invocation, GenerationalNodeId}; use restate_wal_protocol::control::AnnounceLeader; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; @@ -85,7 +85,6 @@ pub enum PartitionProcessorControlCommand { #[derive(Debug)] pub(super) struct PartitionProcessorBuilder { - node_id: GenerationalNodeId, pub partition_id: PartitionId, pub partition_key_range: RangeInclusive, @@ -109,7 +108,6 @@ where { #[allow(clippy::too_many_arguments)] pub(super) fn new( - node_id: GenerationalNodeId, partition_id: PartitionId, partition_key_range: RangeInclusive, status: PartitionProcessorStatus, @@ -120,7 +118,6 @@ where invoker_tx: InvokerInputSender, ) -> Self { Self { - node_id, partition_id, partition_key_range, status, @@ -138,7 +135,6 @@ where pub async fn build( self, - task_center: TaskCenter, bifrost: Bifrost, mut partition_store: PartitionStore, ) -> Result, StorageError> { @@ -177,12 +173,7 @@ where }); let leadership_state = LeadershipState::new( - task_center.clone(), - PartitionProcessorMetadata::new( - self.node_id, - partition_id, - partition_key_range.clone(), - ), + PartitionProcessorMetadata::new(partition_id, partition_key_range.clone()), num_timers_in_memory_limit, cleanup_interval, channel_size, @@ -192,7 +183,6 @@ where ); Ok(PartitionProcessor { - task_center, partition_id, partition_key_range, leadership_state, @@ -241,7 +231,6 @@ pub struct PartitionProcessor { rpc_rx: mpsc::Receiver>, status_watch_tx: watch::Sender, status: PartitionProcessorStatus, - task_center: TaskCenter, max_command_batch_size: usize, partition_store: PartitionStore, @@ -280,12 +269,9 @@ where // Drain rpc_rx self.rpc_rx.close(); while let Some(msg) = self.rpc_rx.recv().await { - respond_to_rpc( - &self.task_center, - msg.into_outgoing(Err(PartitionProcessorRpcError::NotLeader( - self.partition_id, - ))), - ); + respond_to_rpc(msg.into_outgoing(Err(PartitionProcessorRpcError::NotLeader( + self.partition_id, + )))); } res @@ -587,7 +573,7 @@ where ) .await { - respond_to_rpc(&self.task_center, response_tx.prepare(Ok(ready_result))); + respond_to_rpc(response_tx.prepare(Ok(ready_result))); return; } @@ -609,7 +595,6 @@ where GetInvocationOutputResponseMode::ReplyIfNotReady, ) => { respond_to_rpc( - &self.task_center, response_tx.prepare( self.handle_rpc_get_invocation_output( request_id, @@ -845,21 +830,19 @@ where } fn respond_to_rpc( - task_center: &TaskCenter, outgoing: Outgoing< Result, HasConnection, >, ) { // ignore shutdown errors - let _ = task_center.spawn( + let _ = TaskCenter::spawn_child( // Use RpcResponse kind to make sure that the response is sent on the default runtime and // not the partition processor runtime which might be dropped. Otherwise, we risk that the // response is never sent even though the connection is still open. If the default runtime is // dropped, then the process is shutting down which would also close all open connections. TaskKind::RpcResponse, "partition-processor-rpc-response", - None, - async move { outgoing.send().await.map_err(Into::into) }, + async { outgoing.send().await.map_err(Into::into) }, ); } diff --git a/crates/worker/src/partition/shuffle.rs b/crates/worker/src/partition/shuffle.rs index 08d2283c2..6a568addc 100644 --- a/crates/worker/src/partition/shuffle.rs +++ b/crates/worker/src/partition/shuffle.rs @@ -15,12 +15,11 @@ use tokio::sync::mpsc; use tracing::debug; use restate_bifrost::Bifrost; -use restate_core::cancellation_watcher; +use restate_core::{cancellation_watcher, Metadata}; use restate_storage_api::deduplication_table::DedupInformation; use restate_storage_api::outbox_table::OutboxMessage; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; use restate_types::message::MessageIndex; -use restate_types::GenerationalNodeId; use restate_wal_protocol::{append_envelope_to_bifrost, Destination, Envelope, Header, Source}; use crate::partition::shuffle::state_machine::StateMachine; @@ -70,13 +69,14 @@ fn create_header( seq_number: MessageIndex, shuffle_metadata: &ShuffleMetadata, ) -> Header { + let my_node_id = Metadata::with_current(|m| m.my_node_id()); Header { source: Source::Processor { partition_id: shuffle_metadata.partition_id, partition_key: None, leader_epoch: shuffle_metadata.leader_epoch, - node_id: shuffle_metadata.node_id.as_plain(), - generational_node_id: Some(shuffle_metadata.node_id), + node_id: my_node_id.as_plain(), + generational_node_id: Some(my_node_id), }, dest: Destination::Processor { partition_key: dest_partition_key, @@ -152,19 +152,13 @@ impl HintSender { pub(crate) struct ShuffleMetadata { partition_id: PartitionId, leader_epoch: LeaderEpoch, - node_id: GenerationalNodeId, } impl ShuffleMetadata { - pub(crate) fn new( - partition_id: PartitionId, - leader_epoch: LeaderEpoch, - node_id: GenerationalNodeId, - ) -> Self { + pub(crate) fn new(partition_id: PartitionId, leader_epoch: LeaderEpoch) -> Self { ShuffleMetadata { partition_id, leader_epoch, - node_id, } } } @@ -222,9 +216,9 @@ where .. } = self; - debug!(restate.node = %metadata.node_id, restate.partition.id = %metadata.partition_id, "Running shuffle"); + let node_id = Metadata::with_current(|m| m.my_node_id()); + debug!(restate.node = %node_id, restate.partition.id = %metadata.partition_id, "Running shuffle"); - let node_id = metadata.node_id; let state_machine = StateMachine::new( metadata, outbox_reader, @@ -460,9 +454,7 @@ mod tests { use restate_bifrost::{Bifrost, LogEntry}; use restate_core::network::FailingConnector; - use restate_core::{ - TaskCenter, TaskCenterFutureExt, TaskKind, TestCoreEnv, TestCoreEnvBuilder, - }; + use restate_core::{TaskCenter, TaskKind, TestCoreEnv2, TestCoreEnvBuilder2}; use restate_storage_api::outbox_table::OutboxMessage; use restate_storage_api::StorageError; use restate_types::identifiers::{InvocationId, LeaderEpoch, PartitionId}; @@ -470,7 +462,7 @@ mod tests { use restate_types::logs::{KeyFilter, LogId, Lsn, SequenceNumber}; use restate_types::message::MessageIndex; use restate_types::partition_table::PartitionTable; - use restate_types::{GenerationalNodeId, Version}; + use restate_types::Version; use restate_wal_protocol::{Command, Envelope}; use crate::partition::shuffle::{OutboxReader, OutboxReaderError, Shuffle, ShuffleMetadata}; @@ -629,7 +621,8 @@ mod tests { } struct ShuffleEnv { - env: TestCoreEnv, + #[allow(dead_code)] + env: TestCoreEnv2, bifrost: Bifrost, shuffle: Shuffle, } @@ -638,22 +631,18 @@ mod tests { outbox_reader: OR, ) -> ShuffleEnv { // set numbers of partitions to 1 to easily find all sent messages by the shuffle - let env = TestCoreEnvBuilder::with_incoming_only_connector() + let env = TestCoreEnvBuilder2::with_incoming_only_connector() .set_partition_table(PartitionTable::with_equally_sized_partitions( Version::MIN, 1, )) .build() .await; - let metadata = ShuffleMetadata::new( - PartitionId::from(0), - LeaderEpoch::from(0), - GenerationalNodeId::new(0, 0), - ); + let metadata = ShuffleMetadata::new(PartitionId::from(0), LeaderEpoch::from(0)); let (truncation_tx, _truncation_rx) = mpsc::channel(1); - let bifrost = Bifrost::init_in_memory().in_tc(&env.tc).await; + let bifrost = Bifrost::init_in_memory().await; let shuffle = Shuffle::new(metadata, outbox_reader, truncation_tx, 1, bifrost.clone()); ShuffleEnv { @@ -663,7 +652,7 @@ mod tests { } } - #[test(tokio::test)] + #[test(restate_core::test)] async fn shuffle_consecutive_outbox() -> anyhow::Result<()> { let expected_messages = iter::repeat_with(|| Some(ServiceInvocation::mock())) .take(10) @@ -679,29 +668,24 @@ mod tests { let outbox_reader = MockOutboxReader::new(42, expected_messages.clone()); let shuffle_env = create_shuffle_env(outbox_reader).await; - let tc = shuffle_env.env.tc.clone(); - async { - let partition_id = shuffle_env.shuffle.metadata.partition_id; - TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", shuffle_env.shuffle.run())?; - let reader = shuffle_env.bifrost.create_reader( - LogId::from(partition_id), - KeyFilter::Any, - Lsn::OLDEST, - Lsn::MAX, - )?; + let partition_id = shuffle_env.shuffle.metadata.partition_id; + TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", shuffle_env.shuffle.run())?; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + KeyFilter::Any, + Lsn::OLDEST, + Lsn::MAX, + )?; - let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; + let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; - assert_received_invoke_commands(messages, expected_messages); + assert_received_invoke_commands(messages, expected_messages); - Ok::<(), anyhow::Error>(()) - } - .in_tc(&tc) - .await + Ok(()) } - #[test(tokio::test)] + #[test(restate_core::test)] async fn shuffle_holey_outbox() -> anyhow::Result<()> { let expected_messages = vec![ Some(ServiceInvocation::mock()), @@ -721,29 +705,24 @@ mod tests { let outbox_reader = MockOutboxReader::new(42, expected_messages.clone()); let shuffle_env = create_shuffle_env(outbox_reader).await; - let tc = shuffle_env.env.tc.clone(); - async { - let partition_id = shuffle_env.shuffle.metadata.partition_id; - TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", shuffle_env.shuffle.run())?; - let reader = shuffle_env.bifrost.create_reader( - LogId::from(partition_id), - KeyFilter::Any, - Lsn::OLDEST, - Lsn::MAX, - )?; + let partition_id = shuffle_env.shuffle.metadata.partition_id; + TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", shuffle_env.shuffle.run())?; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + KeyFilter::Any, + Lsn::OLDEST, + Lsn::MAX, + )?; - let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; + let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; - assert_received_invoke_commands(messages, expected_messages); + assert_received_invoke_commands(messages, expected_messages); - Ok::<(), anyhow::Error>(()) - } - .in_tc(&tc) - .await + Ok(()) } - #[test(tokio::test)] + #[test(restate_core::test)] async fn shuffle_with_restarts() -> anyhow::Result<()> { let expected_messages: Vec<_> = iter::repeat_with(|| Some(ServiceInvocation::mock())) .take(100) @@ -759,20 +738,19 @@ mod tests { let mut outbox_reader = Arc::new(FailingOutboxReader::new(expected_messages.clone(), 10)); let shuffle_env = create_shuffle_env(Arc::clone(&outbox_reader)).await; - let tc = shuffle_env.env.tc.clone(); let total_restarts = Arc::new(AtomicUsize::new(0)); - let shuffle_task_id = async { - let partition_id = shuffle_env.shuffle.metadata.partition_id; - let reader = shuffle_env.bifrost.create_reader( - LogId::from(partition_id), - KeyFilter::Any, - Lsn::INVALID, - Lsn::MAX, - )?; - let total_restarts = Arc::clone(&total_restarts); + let partition_id = shuffle_env.shuffle.metadata.partition_id; + let reader = shuffle_env.bifrost.create_reader( + LogId::from(partition_id), + KeyFilter::Any, + Lsn::INVALID, + Lsn::MAX, + )?; - let shuffle_task = TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", async move { + let shuffle_task = TaskCenter::spawn_child(TaskKind::Shuffle, "shuffle", { + let total_restarts = Arc::clone(&total_restarts); + async move { let mut shuffle = shuffle_env.shuffle; let metadata = shuffle.metadata; let truncation_tx = shuffle.truncation_tx.clone(); @@ -809,18 +787,16 @@ mod tests { total_restarts.store(num_restarts, Ordering::Relaxed); Ok(()) - })?; - - let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; + } + })?; - assert_received_invoke_commands(messages, expected_messages); + let messages = collect_invoke_commands_until(reader, last_invocation_id).await?; - Ok::<_, anyhow::Error>(shuffle_task) - } - .in_tc(&tc) - .await?; + assert_received_invoke_commands(messages, expected_messages); - let shuffle_task = tc.cancel_task(shuffle_task_id).expect("should exist"); + let shuffle_task = TaskCenter::current() + .cancel_task(shuffle_task) + .expect("should exist"); shuffle_task.await?; // make sure that we have restarted the shuffle diff --git a/crates/worker/src/partition_processor_manager/mod.rs b/crates/worker/src/partition_processor_manager/mod.rs index 0720d7af2..642c8145c 100644 --- a/crates/worker/src/partition_processor_manager/mod.rs +++ b/crates/worker/src/partition_processor_manager/mod.rs @@ -28,28 +28,15 @@ use tokio::task::JoinSet; use tokio::time::MissedTickBehavior; use tracing::{debug, error, info, instrument, warn}; -use crate::metric_definitions::NUM_ACTIVE_PARTITIONS; -use crate::metric_definitions::PARTITION_IS_ACTIVE; -use crate::metric_definitions::PARTITION_IS_EFFECTIVE_LEADER; -use crate::metric_definitions::PARTITION_LABEL; -use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN; -use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; -use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; -use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; -use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; -use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; -use crate::partition_processor_manager::processor_state::{ - LeaderEpochToken, ProcessorState, StartedProcessor, -}; -use crate::partition_processor_manager::snapshot_task::SnapshotPartitionTask; -use crate::partition_processor_manager::spawn_processor_task::SpawnPartitionProcessorTask; use restate_bifrost::Bifrost; use restate_core::network::{Incoming, MessageRouterBuilder, MessageStream}; use restate_core::worker_api::{ ProcessorsManagerCommand, ProcessorsManagerHandle, SnapshotCreated, SnapshotError, SnapshotResult, }; -use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskHandle, TaskKind}; +use restate_core::{ + cancellation_watcher, Metadata, ShutdownError, TaskCenterFutureExt, TaskHandle, TaskKind, +}; use restate_core::{RuntimeRootTaskHandle, TaskCenter}; use restate_invoker_api::StatusHandle; use restate_invoker_impl::{BuildError, ChannelStatusReader}; @@ -76,14 +63,28 @@ use restate_types::partition_table::PartitionTable; use restate_types::protobuf::common::WorkerStatus; use restate_types::GenerationalNodeId; +use crate::metric_definitions::NUM_ACTIVE_PARTITIONS; +use crate::metric_definitions::PARTITION_IS_ACTIVE; +use crate::metric_definitions::PARTITION_IS_EFFECTIVE_LEADER; +use crate::metric_definitions::PARTITION_LABEL; +use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN; +use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; +use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; +use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; +use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; +use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; +use crate::partition_processor_manager::processor_state::{ + LeaderEpochToken, ProcessorState, StartedProcessor, +}; +use crate::partition_processor_manager::snapshot_task::SnapshotPartitionTask; +use crate::partition_processor_manager::spawn_processor_task::SpawnPartitionProcessorTask; + pub struct PartitionProcessorManager { - task_center: TaskCenter, health_status: HealthStatus, updateable_config: Live, processor_states: BTreeMap, name_cache: BTreeMap, - metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, incoming_update_processors: MessageStream, @@ -166,10 +167,8 @@ impl StatusHandle for MultiplexedInvokerStatusReader { impl PartitionProcessorManager { #[allow(clippy::too_many_arguments)] pub fn new( - task_center: TaskCenter, health_status: HealthStatus, updateable_config: Live, - metadata: Metadata, metadata_store_client: MetadataStoreClient, partition_store_manager: PartitionStoreManager, router_builder: &mut MessageRouterBuilder, @@ -180,12 +179,10 @@ impl PartitionProcessorManager { let (tx, rx) = mpsc::channel(updateable_config.pinned().worker.internal_queue_length()); Self { - task_center, health_status, updateable_config, processor_states: BTreeMap::default(), name_cache: Default::default(), - metadata, metadata_store_client, partition_store_manager, incoming_update_processors, @@ -230,9 +227,10 @@ impl PartitionProcessorManager { persisted_lsns_tx, ); TaskCenter::spawn_child(TaskKind::Watchdog, "persisted-lsn-watchdog", watchdog.run())?; + let metadata = Metadata::current(); - let mut logs_version_watcher = self.metadata.watch(MetadataKind::Logs); - let mut partition_table_version_watcher = self.metadata.watch(MetadataKind::PartitionTable); + let mut logs_version_watcher = metadata.watch(MetadataKind::Logs); + let mut partition_table_version_watcher = metadata.watch(MetadataKind::PartitionTable); let mut latest_snapshot_check_interval = tokio::time::interval(Duration::from_secs(5)); latest_snapshot_check_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -295,7 +293,7 @@ impl PartitionProcessorManager { match self.processor_states.get(&partition_id) { None => { // ignore shutdown errors - let _ = self.task_center.spawn( + let _ = TaskCenter::current().spawn( TaskKind::Disposable, "partition-processor-rpc-response", None, @@ -311,11 +309,7 @@ impl PartitionProcessorManager { ); } Some(processor_state) => { - processor_state.try_send_rpc( - partition_id, - partition_processor_rpc, - &self.task_center, - ); + processor_state.try_send_rpc(partition_id, partition_processor_rpc); } } } @@ -350,9 +344,7 @@ impl PartitionProcessorManager { Self::obtain_new_leader_epoch( partition_id, leader_epoch_token, - self.task_center.clone(), self.metadata_store_client.clone(), - self.metadata.my_node_id(), &mut self.asynchronous_operations, ); } @@ -419,7 +411,7 @@ impl PartitionProcessorManager { partition_id, command: ProcessorCommand::from(restart_as), }, - &self.metadata.partition_table_ref(), + &Metadata::with_current(|m| m.partition_table_ref()), ); } } @@ -466,41 +458,33 @@ impl PartitionProcessorManager { partition_id: PartitionId, runtime_task_handle: RuntimeRootTaskHandle>, ) { - let tc = self.task_center.clone(); - self.asynchronous_operations.spawn(async move { - tc.run_in_scope("await-runtime-task-result", Some(partition_id), async { + self.asynchronous_operations.spawn( + async move { let result = runtime_task_handle.await; AsynchronousEvent { partition_id, inner: EventKind::Stopped(result), } - }) - .await - }); + } + .in_current_tc(), + ); } fn obtain_new_leader_epoch( partition_id: PartitionId, leader_epoch_token: LeaderEpochToken, - task_center: TaskCenter, metadata_store_client: MetadataStoreClient, - my_node_id: GenerationalNodeId, asynchronous_operations: &mut JoinSet, ) { - asynchronous_operations.spawn(async move { - task_center - .run_in_scope( - "obtain-new-leader-epoch", - Some(partition_id), - Self::obtain_new_leader_epoch_task( - leader_epoch_token, - partition_id, - metadata_store_client, - my_node_id, - ), - ) - .await - }); + asynchronous_operations.spawn( + Self::obtain_new_leader_epoch_task( + leader_epoch_token, + partition_id, + metadata_store_client, + Metadata::with_current(|m| m.my_node_id()), + ) + .in_current_tc(), + ); } fn get_state(&self) -> BTreeMap { @@ -579,20 +563,22 @@ impl PartitionProcessorManager { } fn on_control_processors(&mut self) { + let (current_logs_version, current_partition_table_version) = + Metadata::with_current(|m| (m.logs_version(), m.partition_table_version())); if self .pending_control_processors .as_ref() .is_some_and(|control_processors| { - control_processors.min_logs_table_version <= self.metadata.logs_version() + control_processors.min_logs_table_version <= current_logs_version && control_processors.min_partition_table_version - <= self.metadata.partition_table_version() + <= current_partition_table_version }) { let control_processors = self .pending_control_processors .take() .expect("must be some"); - let partition_table = self.metadata.partition_table_snapshot(); + let partition_table = Metadata::with_current(|m| m.partition_table_snapshot()); for control_processor in control_processors.commands { self.on_control_processor(control_processor, &partition_table); @@ -623,9 +609,7 @@ impl PartitionProcessorManager { Self::obtain_new_leader_epoch( partition_id, leader_epoch_token, - self.task_center.clone(), self.metadata_store_client.clone(), - self.metadata.my_node_id(), &mut self.asynchronous_operations, ); } @@ -644,18 +628,21 @@ impl PartitionProcessorManager { // We spawn the partition processors start tasks on the blocking thread pool due to a macOS issue // where doing otherwise appears to starve the Tokio event loop, causing very slow startup. - let handle = self.task_center.spawn_blocking_unmanaged( + let handle = TaskCenter::current().spawn_blocking_unmanaged( "starting-partition-processor", starting_task.run(), ); - self.asynchronous_operations.spawn(async move { - let result = handle.await.expect("task must not panic"); - AsynchronousEvent { - partition_id, - inner: EventKind::Started(result), + self.asynchronous_operations.spawn( + async move { + let result = handle.await.expect("task must not panic"); + AsynchronousEvent { + partition_id, + inner: EventKind::Started(result), + } } - }); + .in_current_tc(), + ); self.processor_states.insert( partition_id, @@ -792,7 +779,7 @@ impl PartitionProcessorManager { node_name: config.common.node_name().into(), }; - let spawn_task_result = restate_core::task_center().spawn_unmanaged( + let spawn_task_result = TaskCenter::current().spawn_unmanaged( TaskKind::PartitionSnapshotProducer, "create-snapshot", Some(partition_id), @@ -845,11 +832,9 @@ impl PartitionProcessorManager { SpawnPartitionProcessorTask::new( task_name, - self.metadata.my_node_id(), partition_id, key_range, self.updateable_config.clone(), - self.metadata.clone(), self.bifrost.clone(), self.partition_store_manager.clone(), ) @@ -912,7 +897,7 @@ mod tests { use restate_bifrost::providers::memory_loglet; use restate_bifrost::BifrostService; use restate_core::network::MockPeerConnection; - use restate_core::{TaskCenter, TaskCenterFutureExt, TaskKind, TestCoreEnvBuilder}; + use restate_core::{TaskCenter, TaskKind, TestCoreEnvBuilder2}; use restate_partition_store::PartitionStoreManager; use restate_rocksdb::RocksDbManager; use restate_types::config::{CommonOptions, Configuration, RocksDbOptions, StorageOptions}; @@ -932,7 +917,7 @@ mod tests { /// This test ensures that the lifecycle of partition processors is properly managed by the /// [`PartitionProcessorManager`]. See https://github.com/restatedev/restate/issues/2258 for /// more details. - #[test(tokio::test)] + #[test(restate_core::test)] async fn proper_partition_processor_lifecycle() -> googletest::Result<()> { let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); let node_id = GenerationalNodeId::new(42, 42); @@ -946,109 +931,99 @@ mod tests { nodes_config.upsert_node(node_config); let mut env_builder = - TestCoreEnvBuilder::with_incoming_only_connector().set_nodes_config(nodes_config); - let tc = env_builder.tc.clone(); - async { - let health_status = HealthStatus::default(); + TestCoreEnvBuilder2::with_incoming_only_connector().set_nodes_config(nodes_config); + let health_status = HealthStatus::default(); - RocksDbManager::init(Constant::new(CommonOptions::default())); + RocksDbManager::init(Constant::new(CommonOptions::default())); - let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); - let bifrost = bifrost_svc.handle(); + let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); + let bifrost = bifrost_svc.handle(); - let partition_store_manager = PartitionStoreManager::create( - Constant::new(StorageOptions::default()), - Constant::new(RocksDbOptions::default()).boxed(), - &[(PartitionId::MIN, 0..=PartitionKey::MAX)], - ) - .await?; + let partition_store_manager = PartitionStoreManager::create( + Constant::new(StorageOptions::default()), + Constant::new(RocksDbOptions::default()).boxed(), + &[(PartitionId::MIN, 0..=PartitionKey::MAX)], + ) + .await?; - let partition_processor_manager = PartitionProcessorManager::new( - env_builder.tc.clone(), - health_status, - Live::from_value(Configuration::default()), - env_builder.metadata.clone(), - env_builder.metadata_store_client.clone(), - partition_store_manager, - &mut env_builder.router_builder, - bifrost, - ); + let partition_processor_manager = PartitionProcessorManager::new( + health_status, + Live::from_value(Configuration::default()), + env_builder.metadata_store_client.clone(), + partition_store_manager, + &mut env_builder.router_builder, + bifrost, + ); - let env = env_builder.build().await; - let processors_manager_handle = partition_processor_manager.handle(); - - bifrost_svc.start().await.into_test_result()?; - TaskCenter::current().spawn( - TaskKind::SystemService, - "partition-processor-manager", - None, - partition_processor_manager.run(), - )?; - - let connection = MockPeerConnection::connect( - node_id, - env.metadata.nodes_config_version(), - env.metadata.nodes_config_ref().cluster_name().to_owned(), - env.networking.connection_manager(), - 10, - ) - .await - .into_test_result()?; - - let start_processor_command = ControlProcessors { - min_logs_table_version: Version::MIN, - min_partition_table_version: Version::MIN, - commands: vec![ControlProcessor { - partition_id: PartitionId::MIN, - command: ProcessorCommand::Follower, - }], - }; - let stop_processor_command = ControlProcessors { - min_logs_table_version: Version::MIN, - min_partition_table_version: Version::MIN, - commands: vec![ControlProcessor { - partition_id: PartitionId::MIN, - command: ProcessorCommand::Stop, - }], - }; - - // let's check whether we can start and stop the partition processor multiple times - for i in 0..=10 { + let env = env_builder.build().await; + let processors_manager_handle = partition_processor_manager.handle(); + + bifrost_svc.start().await.into_test_result()?; + TaskCenter::current().spawn( + TaskKind::SystemService, + "partition-processor-manager", + None, + partition_processor_manager.run(), + )?; + + let connection = MockPeerConnection::connect( + node_id, + env.metadata.nodes_config_version(), + env.metadata.nodes_config_ref().cluster_name().to_owned(), + env.networking.connection_manager(), + 10, + ) + .await + .into_test_result()?; + + let start_processor_command = ControlProcessors { + min_logs_table_version: Version::MIN, + min_partition_table_version: Version::MIN, + commands: vec![ControlProcessor { + partition_id: PartitionId::MIN, + command: ProcessorCommand::Follower, + }], + }; + let stop_processor_command = ControlProcessors { + min_logs_table_version: Version::MIN, + min_partition_table_version: Version::MIN, + commands: vec![ControlProcessor { + partition_id: PartitionId::MIN, + command: ProcessorCommand::Stop, + }], + }; + + // let's check whether we can start and stop the partition processor multiple times + for i in 0..=10 { + connection + .send_raw( + if i % 2 == 0 { + start_processor_command.clone() + } else { + stop_processor_command.clone() + }, + Header::default(), + ) + .await + .into_test_result()?; + } + + loop { + let current_state = processors_manager_handle.get_state().await?; + + if current_state.contains_key(&PartitionId::MIN) { + // wait until we see the PartitionId::MIN partition processor running + break; + } else { + // make sure that we eventually start the partition processor connection - .send_raw( - if i % 2 == 0 { - start_processor_command.clone() - } else { - stop_processor_command.clone() - }, - Header::default(), - ) + .send_raw(start_processor_command.clone(), Header::default()) .await .into_test_result()?; + tokio::time::sleep(Duration::from_millis(50)).await; } - - loop { - let current_state = processors_manager_handle.get_state().await?; - - if current_state.contains_key(&PartitionId::MIN) { - // wait until we see the PartitionId::MIN partition processor running - break; - } else { - // make sure that we eventually start the partition processor - connection - .send_raw(start_processor_command.clone(), Header::default()) - .await - .into_test_result()?; - tokio::time::sleep(Duration::from_millis(50)).await; - } - } - - googletest::Result::Ok(()) } - .in_tc(&tc) - .await?; - tc.shutdown_node("test completed", 0).await; RocksDbManager::get().shutdown().await; Ok(()) } diff --git a/crates/worker/src/partition_processor_manager/processor_state.rs b/crates/worker/src/partition_processor_manager/processor_state.rs index 66ec72cf6..24eb61499 100644 --- a/crates/worker/src/partition_processor_manager/processor_state.rs +++ b/crates/worker/src/partition_processor_manager/processor_state.rs @@ -16,7 +16,6 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use ulid::Ulid; -use crate::partition::PartitionProcessorControlCommand; use restate_core::network::Incoming; use restate_core::{TaskCenter, TaskKind}; use restate_invoker_impl::ChannelStatusReader; @@ -25,9 +24,10 @@ use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcRequest, }; - use restate_types::time::MillisSinceEpoch; +use crate::partition::PartitionProcessorControlCommand; + pub type LeaderEpochToken = Ulid; #[derive(Debug, thiserror::Error)] @@ -280,11 +280,10 @@ impl ProcessorState { &self, partition_id: PartitionId, partition_processor_rpc: Incoming, - task_center: &TaskCenter, ) { match self { ProcessorState::Starting { .. } => { - let _ = task_center.spawn( + let _ = TaskCenter::current().spawn( TaskKind::Disposable, "partition-processor-rpc", None, @@ -305,7 +304,7 @@ impl ProcessorState { { match err { TrySendError::Full(req) => { - let _ = task_center.spawn( + let _ = TaskCenter::current().spawn( TaskKind::Disposable, "partition-processor-rpc", None, @@ -318,7 +317,7 @@ impl ProcessorState { ); } TrySendError::Closed(req) => { - let _ = task_center.spawn( + let _ = TaskCenter::current().spawn( TaskKind::Disposable, "partition-processor-rpc", None, @@ -336,7 +335,7 @@ impl ProcessorState { } } ProcessorState::Stopping { .. } => { - let _ = task_center.spawn( + let _ = TaskCenter::current().spawn( TaskKind::Disposable, "partition-processor-rpc", None, diff --git a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs index 13a58d0a0..bb1b49403 100644 --- a/crates/worker/src/partition_processor_manager/spawn_processor_task.rs +++ b/crates/worker/src/partition_processor_manager/spawn_processor_task.rs @@ -23,7 +23,6 @@ use restate_types::config::Configuration; use restate_types::identifiers::{PartitionId, PartitionKey}; use restate_types::live::Live; use restate_types::schema::Schema; -use restate_types::GenerationalNodeId; use crate::invoker_integration::EntryEnricher; use crate::partition::invoker_storage_reader::InvokerStorageReader; @@ -32,11 +31,9 @@ use crate::PartitionProcessorBuilder; pub struct SpawnPartitionProcessorTask { task_name: &'static str, - node_id: GenerationalNodeId, partition_id: PartitionId, key_range: RangeInclusive, configuration: Live, - metadata: Metadata, bifrost: Bifrost, partition_store_manager: PartitionStoreManager, } @@ -45,21 +42,17 @@ impl SpawnPartitionProcessorTask { #[allow(clippy::too_many_arguments)] pub fn new( task_name: &'static str, - node_id: GenerationalNodeId, partition_id: PartitionId, key_range: RangeInclusive, configuration: Live, - metadata: Metadata, bifrost: Bifrost, partition_store_manager: PartitionStoreManager, ) -> Self { Self { task_name, - node_id, partition_id, key_range, configuration, - metadata, bifrost, partition_store_manager, } @@ -76,17 +69,15 @@ impl SpawnPartitionProcessorTask { ) -> anyhow::Result<(StartedProcessor, RuntimeRootTaskHandle>)> { let Self { task_name, - node_id, partition_id, key_range, configuration, - metadata, bifrost, partition_store_manager, } = self; let config = configuration.pinned(); - let schema = metadata.updateable_schema(); + let schema = Metadata::with_current(|m| m.updateable_schema()); let invoker: InvokerService< InvokerStorageReader, EntryEnricher, @@ -108,7 +99,6 @@ impl SpawnPartitionProcessorTask { let options = &configuration.pinned().worker; let pp_builder = PartitionProcessorBuilder::new( - node_id, partition_id, key_range.clone(), status, @@ -146,11 +136,7 @@ impl SpawnPartitionProcessorTask { )?; pp_builder - .build::( - TaskCenter::current(), - bifrost, - partition_store, - ) + .build::(bifrost, partition_store) .await? .run() .await diff --git a/tools/bifrost-benchpress/src/write_to_read.rs b/tools/bifrost-benchpress/src/write_to_read.rs index 8b391c543..ee2e13505 100644 --- a/tools/bifrost-benchpress/src/write_to_read.rs +++ b/tools/bifrost-benchpress/src/write_to_read.rs @@ -94,7 +94,6 @@ pub async fn run( tc.spawn_unmanaged(TaskKind::PartitionProcessor, "test-log-appender", None, { let clock = clock.clone(); let bifrost = bifrost.clone(); - let tc = tc.clone(); let args = args.clone(); let blob = BytesMut::zeroed(args.payload_size).freeze(); async move { @@ -105,7 +104,7 @@ pub async fn run( args.write_buffer_size, args.max_batch_size, )? - .start(tc, "writer", None)?; + .start("writer", None)?; let sender = appender_handle.sender(); let start = Instant::now(); for counter in 1..=args.num_records {