Skip to content

Commit

Permalink
Handle stale metadata version when starting a CC leader
Browse files Browse the repository at this point in the history
Summary:
In some situation, when a Cluster Controller leader is starting again
some or all of the nodes in the cluster can have an outdated version of
the metadata that has the new leader generational id

This causes some of the "get state" requests to fail, which causes the CC
to think the nodes are dead and does an unnecessary reschedule of PPs

To avoid this, we introduces a third state "Suspect" node. Which is
dead and alive until we can be sure
  • Loading branch information
muhamadazmy committed Nov 20, 2024
1 parent 0e00527 commit bf0685d
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 86 deletions.
62 changes: 43 additions & 19 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ use std::sync::Arc;
use std::time::Instant;

use tokio::sync::watch;
use tracing::{debug, trace};

use restate_core::network::rpc_router::RpcRouter;
use restate_core::network::{MessageRouterBuilder, Networking, Outgoing, TransportConnect};
use restate_core::network::{
MessageRouterBuilder, NetworkError, Networking, Outgoing, TransportConnect,
};
use restate_core::{Metadata, ShutdownError, TaskCenter, TaskHandle};
use restate_types::cluster::cluster_state::{AliveNode, ClusterState, DeadNode, NodeState};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, SuspectNode,
};
use restate_types::net::node::GetNodeState;
use restate_types::time::MillisSinceEpoch;
use restate_types::Version;
Expand Down Expand Up @@ -129,7 +134,8 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {

let mut nodes = BTreeMap::new();
let mut join_set = tokio::task::JoinSet::new();
for (node_id, _) in nodes_config.iter() {
for (_, node_config) in nodes_config.iter() {
let node_id = node_config.current_generation;
let rpc_router = get_state_router.clone();
let tc = tc.clone();
let network_sender = network_sender.clone();
Expand Down Expand Up @@ -166,32 +172,50 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
let peer = response.peer();
let msg = response.into_body();
nodes.insert(
node_id,
node_id.as_plain(),
NodeState::Alive(AliveNode {
last_heartbeat_at: MillisSinceEpoch::now(),
generational_node_id: peer,
partitions: msg.partition_processor_state.unwrap_or_default(),
}),
);
}
_ => {
Err(NetworkError::RemoteVersionMismatch(msg)) => {
// When **this** node has just started, other peers might not have
// learned about the new metadata version and then they can
// return a RemoteVersionMismatch error.
// In this case we are not sure about the peer state but it's
// definitely not dead!
// Hence we set it as Suspect node. This gives it enough time to update
// its metadata, before we know the exact state
debug!("Node {node_id} is marked as Suspect: {msg}");
nodes.insert(
node_id.as_plain(),
NodeState::Suspect(SuspectNode {
generational_node_id: node_id,
last_attempt: MillisSinceEpoch::now(),
}),
);
}
Err(err) => {
// todo: implement a more robust failure detector
// This is a naive mechanism for failure detection and is just a stop-gap measure.
// A single connection error or timeout will cause a node to be marked as dead.
let last_seen_alive =
last_state
.nodes
.get(&node_id)
.and_then(|state| match state {
NodeState::Alive(AliveNode {
last_heartbeat_at, ..
}) => Some(*last_heartbeat_at),
NodeState::Dead(DeadNode { last_seen_alive }) => {
*last_seen_alive
}
});

nodes.insert(node_id, NodeState::Dead(DeadNode { last_seen_alive }));
trace!("Node {node_id} is marked dead {node_id}: {err}");
let last_seen_alive = last_state.nodes.get(&node_id.as_plain()).and_then(
|state| match state {
NodeState::Alive(AliveNode {
last_heartbeat_at, ..
}) => Some(*last_heartbeat_at),
NodeState::Dead(DeadNode { last_seen_alive }) => *last_seen_alive,
NodeState::Suspect(_) => None,
},
);

nodes.insert(
node_id.as_plain(),
NodeState::Dead(DeadNode { last_seen_alive }),
);
}
};
}
Expand Down
1 change: 0 additions & 1 deletion crates/admin/src/cluster_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod cluster_controller_state;
pub mod cluster_state_refresher;
pub mod grpc_svc_handler;
mod logs_controller;
Expand Down
5 changes: 5 additions & 0 deletions crates/admin/src/cluster_controller/observed_cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ impl ObservedClusterState {
self.alive_nodes
.insert(*node_id, alive_node.generational_node_id);
}
NodeState::Suspect(maybe_node) => {
self.dead_nodes.remove(node_id);
self.alive_nodes
.insert(*node_id, maybe_node.generational_node_id);
}
NodeState::Dead(_) => {
self.alive_nodes.remove(node_id);
self.dead_nodes.insert(*node_id);
Expand Down
65 changes: 12 additions & 53 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod state;

use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use codederror::CodedError;
use itertools::Itertools;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tokio::time::{Instant, Interval, MissedTickBehavior};
Expand Down Expand Up @@ -43,12 +44,11 @@ use restate_types::partition_table::PartitionTable;
use restate_types::protobuf::common::AdminStatus;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_controller_state::ClusterControllerState;
use super::cluster_state_refresher::ClusterStateRefresher;
use super::grpc_svc_handler::ClusterCtrlSvcHandler;
use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer;
use crate::cluster_controller::cluster_controller_state::Leader;
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use state::ClusterControllerState;

#[derive(Debug, thiserror::Error, CodedError)]
pub enum Error {
Expand All @@ -58,14 +58,14 @@ pub enum Error {
}

pub struct Service<T> {
pub(crate) task_center: TaskCenter,
pub(crate) metadata: Metadata,
pub(crate) networking: Networking<T>,
pub(crate) bifrost: Bifrost,
pub(crate) cluster_state_refresher: ClusterStateRefresher<T>,
pub(crate) configuration: Live<Configuration>,
pub(crate) metadata_writer: MetadataWriter,
pub(crate) metadata_store_client: MetadataStoreClient,
task_center: TaskCenter,
metadata: Metadata,
networking: Networking<T>,
bifrost: Bifrost,
cluster_state_refresher: ClusterStateRefresher<T>,
configuration: Live<Configuration>,
metadata_writer: MetadataWriter,
metadata_store_client: MetadataStoreClient,

processor_manager_client: PartitionProcessorManagerClient<Networking<T>>,
command_tx: mpsc::Sender<ClusterControllerCommand>,
Expand Down Expand Up @@ -224,47 +224,6 @@ impl<T: TransportConnect> Service<T> {
}
}

async fn next_cluster_state(
&self,
state: &mut ClusterControllerState<T>,
) -> anyhow::Result<()> {
let nodes_config = self.metadata.nodes_config_ref();
let maybe_leader = nodes_config
.get_admin_nodes()
.filter(|node| {
self.observed_cluster_state
.is_node_alive(node.current_generation)
})
.map(|node| node.current_generation)
.sorted()
.next();

// A Cluster Controller is a leader if the node holds the smallest PlainNodeID
// If no other node was found to take leadership, we assume leadership

let is_leader = match maybe_leader {
None => true,
Some(leader) => leader == self.metadata.my_node_id(),
};

match (is_leader, &state) {
(true, ClusterControllerState::Leader(_))
| (false, ClusterControllerState::Follower) => {
// nothing to do
}
(true, ClusterControllerState::Follower) => {
info!("Cluster controller switching to leader mode");
*state = ClusterControllerState::Leader(Leader::from_service(self).await?);
}
(false, ClusterControllerState::Leader(_)) => {
info!("Cluster controller switching to follower mode");
*state = ClusterControllerState::Follower;
}
};

Ok(())
}

pub async fn run(mut self) -> anyhow::Result<()> {
self.init_partition_table().await?;

Expand Down Expand Up @@ -298,7 +257,7 @@ impl<T: TransportConnect> Service<T> {
},
Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => {
self.observed_cluster_state.update(&cluster_state);
self.next_cluster_state(&mut state).await?;
state.update(&self).await?;

state.on_observed_cluster_state(&self.observed_cluster_state).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
use std::collections::BTreeMap;

use futures::future::OptionFuture;
use restate_types::logs::metadata::Logs;
use restate_types::nodes_config::NodesConfiguration;
use itertools::Itertools;
use tokio::sync::watch;
use tokio::time;
use tokio::time::{Interval, MissedTickBehavior};
use tracing::{debug, warn};
use tracing::{debug, info, warn};

use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
Expand All @@ -26,24 +25,70 @@ use restate_types::cluster::cluster_state::{AliveNode, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
use restate_types::logs::metadata::Logs;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::metadata::MetadataKind;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::partition_table::PartitionTable;
use restate_types::{GenerationalNodeId, Version};

use super::cluster_state_refresher::ClusterStateWatcher;
use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher;
use crate::cluster_controller::logs_controller::{
LogsBasedPartitionProcessorPlacementHints, LogsController,
};
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
use crate::cluster_controller::scheduler::{Scheduler, SchedulingPlanNodeSetSelectorHints};
use crate::cluster_controller::service::Service;

pub(crate) enum ClusterControllerState<T> {
pub enum ClusterControllerState<T> {
Follower,
Leader(Leader<T>),
}

impl<T> ClusterControllerState<T>
where
T: TransportConnect,
{
pub async fn update(&mut self, service: &Service<T>) -> anyhow::Result<()> {
let nodes_config = service.metadata.nodes_config_ref();
let maybe_leader = nodes_config
.get_admin_nodes()
.filter(|node| {
service
.observed_cluster_state
.is_node_alive(node.current_generation)
})
.map(|node| node.current_generation)
.sorted()
.next();

// A Cluster Controller is a leader if the node holds the smallest PlainNodeID
// If no other node was found to take leadership, we assume leadership

let is_leader = match maybe_leader {
None => true,
Some(leader) => leader == service.metadata.my_node_id(),
};

match (is_leader, &self) {
(true, ClusterControllerState::Leader(_))
| (false, ClusterControllerState::Follower) => {
// nothing to do
}
(true, ClusterControllerState::Follower) => {
info!("Cluster controller switching to leader mode");
*self = ClusterControllerState::Leader(Leader::from_service(service).await?);
}
(false, ClusterControllerState::Leader(_)) => {
info!("Cluster controller switching to follower mode");
*self = ClusterControllerState::Follower;
}
};

Ok(())
}
}

impl<T> ClusterControllerState<T>
where
T: TransportConnect,
Expand Down Expand Up @@ -80,7 +125,7 @@ where
}
}

pub(crate) struct Leader<T> {
pub struct Leader<T> {
metadata: Metadata,
bifrost: Bifrost,
metadata_store_client: MetadataStoreClient,
Expand All @@ -102,7 +147,7 @@ impl<T> Leader<T>
where
T: TransportConnect,
{
pub async fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
async fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
let configuration = service.configuration.pinned();

let scheduler = Scheduler::init(
Expand Down Expand Up @@ -245,7 +290,7 @@ where
.insert(*generational_node_id, lsn);
}
}
NodeState::Dead(_) => {
NodeState::Dead(_) | NodeState::Suspect(_) => {
// nothing to do
}
}
Expand Down
20 changes: 19 additions & 1 deletion crates/core/src/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

use std::time::Duration;

use tonic::Code;

use restate_types::net::{CodecError, MIN_SUPPORTED_PROTOCOL_VERSION};
use restate_types::nodes_config::NodesConfigError;
use restate_types::GenerationalNodeId;
Expand Down Expand Up @@ -49,7 +51,7 @@ pub enum NetworkError {
#[error("protocol error: {0}")]
ProtocolError(#[from] ProtocolError),
#[error("cannot connect: {} {}", tonic::Status::code(.0), tonic::Status::message(.0))]
ConnectError(#[from] tonic::Status),
ConnectError(tonic::Status),
#[error("new node generation exists: {0}")]
OldPeerGeneration(String),
#[error("connection lost to peer {0}")]
Expand All @@ -58,10 +60,23 @@ pub enum NetworkError {
Unavailable(String),
#[error("failed syncing metadata: {0}")]
Metadata(#[from] SyncError),
#[error("remote metadata version mismatch: {0}")]
// todo(azmy): A temporary error that should be removed
// after relaxing the restrictions on node ids in upcoming change
RemoteVersionMismatch(String),
#[error("network channel is full and sending would block")]
Full,
}

impl From<tonic::Status> for NetworkError {
fn from(value: tonic::Status) -> Self {
if value.code() == Code::FailedPrecondition {
Self::RemoteVersionMismatch(value.message().into())
} else {
Self::ConnectError(value)
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
#[error("handshake failed: {0}")]
Expand Down Expand Up @@ -104,6 +119,9 @@ impl From<NetworkError> for tonic::Status {
NetworkError::Timeout(_) => tonic::Status::deadline_exceeded(value.to_string()),
NetworkError::OldPeerGeneration(e) => tonic::Status::already_exists(e),
NetworkError::ConnectError(s) => s,
NetworkError::UnknownNode(err @ NodesConfigError::GenerationMismatch { .. }) => {
tonic::Status::failed_precondition(err.to_string())
}
e => tonic::Status::internal(e.to_string()),
}
}
Expand Down
Loading

0 comments on commit bf0685d

Please sign in to comment.