diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 3a014e7c8..0cc38a25c 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -14,7 +14,9 @@ mod network_server; mod roles; use anyhow::Context; -use tonic::{Code, IntoRequest}; +use bytestring::ByteString; +use prost_dto::IntoProto; +use std::num::NonZeroU16; use tracing::{debug, error, info, trace, warn}; use crate::cluster_marker::ClusterValidationError; @@ -23,31 +25,35 @@ use crate::network_server::NetworkServer; use crate::roles::{AdminRole, BaseRole, IngressRole, WorkerRole}; use codederror::CodedError; use restate_bifrost::BifrostService; -use restate_core::metadata_store::ReadWriteError; -use restate_core::network::net_util::create_tonic_channel_from_advertised_address; +use restate_core::metadata_store::{ + retry_on_network_error, Precondition, ReadWriteError, WriteError, +}; use restate_core::network::{ GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, }; use restate_core::partitions::{spawn_partition_routing_refresher, PartitionRoutingRefresher}; -use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest; use restate_core::{cancellation_watcher, Metadata, TaskKind}; use restate_core::{spawn_metadata_manager, MetadataBuilder, MetadataManager, TaskCenter}; #[cfg(feature = "replicated-loglet")] use restate_log_server::LogServerService; use restate_metadata_store::local::LocalMetadataStoreService; use restate_metadata_store::MetadataStoreClient; -use restate_types::config::Configuration; +use restate_types::config::{CommonOptions, Configuration}; use restate_types::errors::GenericError; use restate_types::health::Health; use restate_types::live::Live; +use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; #[cfg(feature = "replicated-loglet")] use restate_types::logs::RecordCache; -use restate_types::nodes_config::Role; +use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; +use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; +use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; use restate_types::protobuf::common::{ AdminStatus, IngressStatus, LogServerStatus, MetadataServerStatus, NodeRpcStatus, NodeStatus, WorkerStatus, }; +use restate_types::storage::StorageEncode; +use restate_types::{GenerationalNodeId, Version, Versioned}; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -359,45 +365,28 @@ impl Node { if config.common.allow_bootstrap { TaskCenter::spawn(TaskKind::SystemBoot, "auto-provision-cluster", { - let channel = create_tonic_channel_from_advertised_address( - config.common.advertised_address.clone(), - &config.networking, - ); - let client = NodeCtlSvcClient::new(channel); - let retry_policy = config.common.network_error_retry_policy.clone(); + let cluster_configuration = ClusterConfiguration::from_configuration(&config); + let metadata_store_client = self.metadata_store_client.clone(); + let common_opts = config.common.clone(); async move { - let response = retry_policy - .retry_if( - || { - let mut client = client.clone(); - async move { - client - .provision_cluster( - ProvisionClusterRequest { - dry_run: false, - ..Default::default() - } - .into_request(), - ) - .await - } - }, - |status| status.code() == Code::Unavailable, - ) - .await; + let response = provision_cluster_metadata( + &metadata_store_client, + &common_opts, + &cluster_configuration, + ) + .await; match response { - Ok(response) => { - let response = response.into_inner(); - debug_assert!(!response.dry_run, "Provision w/o dry run"); - } - Err(err) => { - if err.code() == Code::AlreadyExists { - debug!("The cluster is already provisioned.") + Ok(provisioned) => { + if provisioned { + info!("Auto provisioned cluster."); } else { - warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + debug!("The cluster is already provisioned."); } } + Err(err) => { + warn!("Failed to auto provision the cluster. In order to continue you have to provision the cluster manually: {err}"); + } } Ok(()) @@ -551,6 +540,135 @@ impl Node { } } +#[derive(Clone, Debug, IntoProto)] +#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] +pub struct ClusterConfiguration { + #[into_proto(map = "num_partitions_to_u32")] + pub num_partitions: NonZeroU16, + #[proto(required)] + pub replication_strategy: ReplicationStrategy, + #[proto(required)] + pub default_provider: DefaultProvider, +} + +fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { + u32::from(num_partitions.get()) +} + +impl ClusterConfiguration { + pub fn from_configuration(configuration: &Configuration) -> Self { + ClusterConfiguration { + num_partitions: configuration.common.bootstrap_num_partitions, + replication_strategy: ReplicationStrategy::default(), + default_provider: DefaultProvider::from_configuration(configuration), + } + } +} + +/// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns +/// `false` if the cluster is already provisioned. +/// +/// This method returns an error if any of the initial metadata couldn't be written to the +/// metadata store. In this case, the method does not try to clean the already written metadata +/// up. Instead, the caller can retry to complete the provisioning. +async fn provision_cluster_metadata( + metadata_store_client: &MetadataStoreClient, + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, +) -> anyhow::Result { + let (initial_nodes_configuration, initial_partition_table, initial_logs) = + generate_initial_metadata(common_opts, cluster_configuration); + + let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + metadata_store_client.provision(&initial_nodes_configuration) + }) + .await?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + write_initial_value_dont_fail_if_it_exists( + metadata_store_client, + PARTITION_TABLE_KEY.clone(), + &initial_partition_table, + ) + }) + .await + .context("failed provisioning the initial partition table")?; + + retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { + write_initial_value_dont_fail_if_it_exists( + metadata_store_client, + BIFROST_CONFIG_KEY.clone(), + &initial_logs, + ) + }) + .await + .context("failed provisioning the initial logs configuration")?; + + Ok(result) +} + +fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { + let mut initial_nodes_configuration = + NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); + let node_config = NodeConfig::new( + common_opts.node_name().to_owned(), + common_opts + .force_node_id + .map(|force_node_id| force_node_id.with_generation(1)) + .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), + common_opts.advertised_address.clone(), + common_opts.roles, + LogServerConfig::default(), + ); + initial_nodes_configuration.upsert_node(node_config); + initial_nodes_configuration +} + +fn generate_initial_metadata( + common_opts: &CommonOptions, + cluster_configuration: &ClusterConfiguration, +) -> (NodesConfiguration, PartitionTable, Logs) { + let mut initial_partition_table_builder = PartitionTableBuilder::default(); + initial_partition_table_builder + .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) + .expect("Empty partition table should not have conflicts"); + initial_partition_table_builder + .set_replication_strategy(cluster_configuration.replication_strategy); + let initial_partition_table = initial_partition_table_builder.build(); + + let mut logs_builder = Logs::default().into_builder(); + logs_builder.set_configuration(LogsConfiguration::from( + cluster_configuration.default_provider.clone(), + )); + let initial_logs = logs_builder.build(); + + let initial_nodes_configuration = create_initial_nodes_configuration(common_opts); + + ( + initial_nodes_configuration, + initial_partition_table, + initial_logs, + ) +} + +async fn write_initial_value_dont_fail_if_it_exists( + metadata_store_client: &MetadataStoreClient, + key: ByteString, + initial_value: &T, +) -> Result<(), WriteError> { + match metadata_store_client + .put(key, initial_value, Precondition::DoesNotExist) + .await + { + Ok(_) => Ok(()), + Err(WriteError::FailedPrecondition(_)) => { + // we might have failed on a previous attempt after writing this value; so let's continue + Ok(()) + } + Err(err) => Err(err), + } +} + #[cfg(not(feature = "replicated-loglet"))] fn warn_if_log_store_left_artifacts(config: &Configuration) { if config.log_server.data_dir().exists() { diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 28d5cf3d8..f413d3196 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -8,15 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::{provision_cluster_metadata, ClusterConfiguration}; use anyhow::Context; use bytes::BytesMut; -use bytestring::ByteString; use enumset::EnumSet; use futures::stream::BoxStream; -use prost_dto::IntoProto; -use restate_core::metadata_store::{ - retry_on_network_error, MetadataStoreClient, Precondition, WriteError, -}; +use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; use restate_core::network::{ConnectionManager, ProtocolError, TransportConnect}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; @@ -26,16 +23,14 @@ use restate_core::protobuf::node_ctl_svc::{ }; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; -use restate_types::config::{CommonOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::health::Health; -use restate_types::logs::metadata::{DefaultProvider, Logs, LogsConfiguration}; -use restate_types::metadata_store::keys::{BIFROST_CONFIG_KEY, PARTITION_TABLE_KEY}; -use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; -use restate_types::partition_table::{PartitionTable, PartitionTableBuilder, ReplicationStrategy}; +use restate_types::logs::metadata::DefaultProvider; +use restate_types::nodes_config::Role; +use restate_types::partition_table::ReplicationStrategy; use restate_types::protobuf::cluster::ClusterConfiguration as ProtoClusterConfiguration; use restate_types::protobuf::node::Message; -use restate_types::storage::{StorageCodec, StorageEncode}; -use restate_types::{GenerationalNodeId, Version, Versioned}; +use restate_types::storage::StorageCodec; use std::num::NonZeroU16; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; @@ -65,106 +60,6 @@ impl NodeCtlSvcHandler { } } - /// Provision the cluster metadata. Returns `true` if the cluster was newly provisioned. Returns - /// `false` if the cluster is already provisioned. - async fn provision_cluster_metadata( - &self, - common_opts: &CommonOptions, - cluster_configuration: &ClusterConfiguration, - ) -> anyhow::Result { - let (initial_nodes_configuration, initial_partition_table, initial_logs) = - Self::generate_initial_metadata(common_opts, cluster_configuration); - - let result = retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.metadata_store_client - .provision(&initial_nodes_configuration) - }) - .await?; - - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.write_initial_value_dont_fail_if_it_exists( - PARTITION_TABLE_KEY.clone(), - &initial_partition_table, - ) - }) - .await - .context("failed provisioning the initial partition table")?; - - retry_on_network_error(common_opts.network_error_retry_policy.clone(), || { - self.write_initial_value_dont_fail_if_it_exists( - BIFROST_CONFIG_KEY.clone(), - &initial_logs, - ) - }) - .await - .context("failed provisioning the initial logs configuration")?; - - Ok(result) - } - - pub fn create_initial_nodes_configuration(common_opts: &CommonOptions) -> NodesConfiguration { - let mut initial_nodes_configuration = - NodesConfiguration::new(Version::MIN, common_opts.cluster_name().to_owned()); - let node_config = NodeConfig::new( - common_opts.node_name().to_owned(), - common_opts - .force_node_id - .map(|force_node_id| force_node_id.with_generation(1)) - .unwrap_or(GenerationalNodeId::INITIAL_NODE_ID), - common_opts.advertised_address.clone(), - common_opts.roles, - LogServerConfig::default(), - ); - initial_nodes_configuration.upsert_node(node_config); - initial_nodes_configuration - } - - fn generate_initial_metadata( - common_opts: &CommonOptions, - cluster_configuration: &ClusterConfiguration, - ) -> (NodesConfiguration, PartitionTable, Logs) { - let mut initial_partition_table_builder = PartitionTableBuilder::default(); - initial_partition_table_builder - .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) - .expect("Empty partition table should not have conflicts"); - initial_partition_table_builder - .set_replication_strategy(cluster_configuration.replication_strategy); - let initial_partition_table = initial_partition_table_builder.build(); - - let mut logs_builder = Logs::default().into_builder(); - logs_builder.set_configuration(LogsConfiguration::from( - cluster_configuration.default_provider.clone(), - )); - let initial_logs = logs_builder.build(); - - let initial_nodes_configuration = Self::create_initial_nodes_configuration(common_opts); - - ( - initial_nodes_configuration, - initial_partition_table, - initial_logs, - ) - } - - async fn write_initial_value_dont_fail_if_it_exists( - &self, - key: ByteString, - initial_value: &T, - ) -> Result<(), WriteError> { - match self - .metadata_store_client - .put(key, initial_value, Precondition::DoesNotExist) - .await - { - Ok(_) => Ok(()), - Err(WriteError::FailedPrecondition(_)) => { - // we might have failed on a previous attempt after writing this value; so let's continue - Ok(()) - } - Err(err) => Err(err), - } - } - fn resolve_cluster_configuration( config: &Configuration, request: ProvisionClusterRequest, @@ -280,10 +175,13 @@ impl NodeCtlSvc for NodeCtlSvcHandler { ))); } - let newly_provisioned = self - .provision_cluster_metadata(&config.common, &cluster_configuration) - .await - .map_err(|err| Status::internal(err.to_string()))?; + let newly_provisioned = provision_cluster_metadata( + &self.metadata_store_client, + &config.common, + &cluster_configuration, + ) + .await + .map_err(|err| Status::internal(err.to_string()))?; if !newly_provisioned { return Err(Status::already_exists( @@ -297,21 +195,6 @@ impl NodeCtlSvc for NodeCtlSvcHandler { } } -#[derive(Clone, Debug, IntoProto)] -#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] -pub struct ClusterConfiguration { - #[into_proto(map = "num_partitions_to_u32")] - pub num_partitions: NonZeroU16, - #[proto(required)] - pub replication_strategy: ReplicationStrategy, - #[proto(required)] - pub default_provider: DefaultProvider, -} - -fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { - u32::from(num_partitions.get()) -} - pub struct CoreNodeSvcHandler { connections: ConnectionManager, }