diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index de996e7a7a..7265f25a58 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -173,15 +173,18 @@ impl NodeBuilder { let node_events_channel = NodeEventsChannel::default(); let (node_cmds, _) = broadcast::channel(10); - let node = Node { + let node = NodeInner { network: network.clone(), events_channel: node_events_channel.clone(), node_cmds: node_cmds.clone(), - initial_peers: Arc::new(self.initial_peers), - reward_address: Arc::new(reward_address), + initial_peers: self.initial_peers, + reward_address, #[cfg(feature = "open-metrics")] node_metrics, - owner: self.owner.clone(), + owner: self.owner, + }; + let node = Node { + inner: Arc::new(node), }; let running_node = RunningNode { network, @@ -205,27 +208,68 @@ pub enum NodeCmd {} /// storage, and broadcasts node-related events. #[derive(Clone)] pub(crate) struct Node { - pub(crate) network: Network, - pub(crate) events_channel: NodeEventsChannel, - // We keep a copy of the Sender which is clonable and we can obtain a receiver from. - node_cmds: broadcast::Sender, + inner: Arc, +} + +/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose +/// the Arc from the interface. +struct NodeInner { + events_channel: NodeEventsChannel, // Peers that are dialed at startup of node. - initial_peers: Arc>, - reward_address: Arc, + initial_peers: Vec, + network: Network, + node_cmds: broadcast::Sender, #[cfg(feature = "open-metrics")] - pub(crate) node_metrics: Option, - /// node owner's discord username, in readable format - /// if not set, there will be no payment forward to be undertaken + node_metrics: Option, + /// Node owner's discord username, in readable format + /// If not set, there will be no payment forward to be undertaken owner: Option, + reward_address: MainPubkey, } impl Node { + /// Returns the NodeEventsChannel + pub(crate) fn events_channel(&self) -> &NodeEventsChannel { + &self.inner.events_channel + } + + /// Returns the initial peers that the node will dial at startup + pub(crate) fn initial_peers(&self) -> &Vec { + &self.inner.initial_peers + } + + /// Returns the instance of Network + pub(crate) fn network(&self) -> &Network { + &self.inner.network + } + + /// Returns the NodeCmds channel + pub(crate) fn node_cmds(&self) -> &broadcast::Sender { + &self.inner.node_cmds + } + + #[cfg(feature = "open-metrics")] + /// Returns a reference to the NodeMetrics if the `open-metrics` feature flag is enabled + pub(crate) fn node_metrics(&self) -> Option<&NodeMetrics> { + self.inner.node_metrics.as_ref() + } + + /// Returns the owner of the node + pub(crate) fn owner(&self) -> Option<&String> { + self.inner.owner.as_ref() + } + + /// Returns the reward address of the node + pub(crate) fn reward_address(&self) -> &MainPubkey { + &self.inner.reward_address + } + /// Runs the provided `SwarmDriver` and spawns a task to process for `NetworkEvents` fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver) { let mut rng = StdRng::from_entropy(); let peers_connected = Arc::new(AtomicUsize::new(0)); - let mut cmds_receiver = self.node_cmds.subscribe(); + let mut cmds_receiver = self.node_cmds().subscribe(); // read the forwarded balance from the file and set the metric. // This is done initially because reward forwarding takes a while to kick in @@ -233,10 +277,10 @@ impl Node { let node_copy = self.clone(); #[cfg(all(feature = "reward-forward", feature = "open-metrics"))] let _handle = spawn(async move { - let root_dir = node_copy.network.root_dir_path().clone(); + let root_dir = node_copy.network().root_dir_path().clone(); let balance = read_forwarded_balance_value(&root_dir); - if let Some(ref node_metrics) = node_copy.node_metrics { + if let Some(node_metrics) = node_copy.node_metrics() { let _ = node_metrics.total_forwarded_rewards.set(balance as i64); } }); @@ -300,7 +344,7 @@ impl Node { } None => { error!("The `NetworkEvent` channel is closed"); - self.events_channel.broadcast(NodeEvent::ChannelClosed); + self.events_channel().broadcast(NodeEvent::ChannelClosed); break; } } @@ -309,7 +353,7 @@ impl Node { _ = replication_interval.tick() => { let start = Instant::now(); trace!("Periodic replication triggered"); - let network = self.network.clone(); + let network = self.network().clone(); self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { @@ -321,7 +365,7 @@ impl Node { _ = bad_nodes_check_interval.tick() => { let start = Instant::now(); trace!("Periodic bad_nodes check triggered"); - let network = self.network.clone(); + let network = self.network().clone(); self.record_metrics(Marker::IntervalBadNodesCheckTriggered); let _handle = spawn(async move { @@ -338,14 +382,14 @@ impl Node { // runs every balance_forward_interval time _ = balance_forward_interval.tick() => { if cfg!(feature = "reward-forward") { - if let Some(ref owner) = self.owner { + if let Some(owner) = self.owner() { let start = Instant::now(); trace!("Periodic balance forward triggered"); - let network = self.network.clone(); + let network = self.network().clone(); let forwarding_reason = owner.clone(); #[cfg(feature = "open-metrics")] - let total_forwarded_rewards = self.node_metrics.as_ref().map(|metrics|metrics.total_forwarded_rewards.clone()); + let total_forwarded_rewards = self.node_metrics().map(|metrics|metrics.total_forwarded_rewards.clone()); let _handle = spawn(async move { @@ -361,7 +405,7 @@ impl Node { } _ = uptime_metrics_update_interval.tick() => { #[cfg(feature = "open-metrics")] - if let Some(node_metrics) = &self.node_metrics { + if let Some(node_metrics) = self.node_metrics() { let _ = node_metrics.uptime.set(node_metrics.started_instant.elapsed().as_secs() as i64); } } @@ -383,7 +427,7 @@ impl Node { pub(crate) fn record_metrics(&self, marker: Marker) { marker.log(); #[cfg(feature = "open-metrics")] - if let Some(node_metrics) = &self.node_metrics { + if let Some(node_metrics) = self.node_metrics() { node_metrics.record(marker) } } @@ -404,17 +448,18 @@ impl Node { // increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers let _ = peers_connected.fetch_add(1, Ordering::SeqCst); if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE { - self.events_channel.broadcast(NodeEvent::ConnectedToNetwork); + self.events_channel() + .broadcast(NodeEvent::ConnectedToNetwork); } self.record_metrics(Marker::PeersInRoutingTable(connected_peers)); self.record_metrics(Marker::PeerAddedToRoutingTable(peer_id)); // try replication here - let net_clone = self.network.clone(); + let network = self.network().clone(); self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { - Self::try_interval_replication(net_clone); + Self::try_interval_replication(network); }); } NetworkEvent::PeerRemoved(peer_id, connected_peers) => { @@ -422,10 +467,10 @@ impl Node { self.record_metrics(Marker::PeersInRoutingTable(connected_peers)); self.record_metrics(Marker::PeerRemovedFromRoutingTable(peer_id)); - let net = self.network.clone(); + let network = self.network().clone(); self.record_metrics(Marker::IntervalReplicationTriggered); let _handle = spawn(async move { - Self::try_interval_replication(net); + Self::try_interval_replication(network); }); } NetworkEvent::PeerWithUnsupportedProtocol { .. } => { @@ -443,7 +488,7 @@ impl Node { bad_behaviour, }); - let network = self.network.clone(); + let network = self.network().clone(); let _handle = spawn(async move { network.send_req_ignore_reply(request, bad_peer); }); @@ -451,10 +496,10 @@ impl Node { NetworkEvent::NewListenAddr(_) => { event_header = "NewListenAddr"; if !cfg!(feature = "local-discovery") { - let network = self.network.clone(); - let peers = Arc::clone(&self.initial_peers); + let network = self.network().clone(); + let peers = self.initial_peers().clone(); let _handle = spawn(async move { - for addr in &*peers { + for addr in peers { if let Err(err) = network.dial(addr.clone()).await { tracing::error!("Failed to dial {addr}: {err:?}"); }; @@ -480,8 +525,8 @@ impl Node { } NetworkEvent::QueryRequestReceived { query, channel } => { event_header = "QueryRequestReceived"; - let network = self.network.clone(); - let payment_address = *self.reward_address; + let network = self.network().clone(); + let payment_address = *self.reward_address(); let _handle = spawn(async move { let res = Self::handle_query(&network, query, payment_address).await; @@ -508,12 +553,12 @@ impl Node { NetworkEvent::TerminateNode { reason } => { event_header = "TerminateNode"; error!("Received termination from swarm_driver due to {reason:?}"); - self.events_channel + self.events_channel() .broadcast(NodeEvent::TerminateNode(format!("{reason:?}"))); } NetworkEvent::FailedToFetchHolders(bad_nodes) => { event_header = "FailedToFetchHolders"; - let network = self.network.clone(); + let network = self.network().clone(); // Note: this log will be checked in CI, and expecting `not appear`. // any change to the keyword `failed to fetch` shall incur // correspondent CI script change as well. @@ -526,7 +571,7 @@ impl Node { } NetworkEvent::BadNodeVerification { peer_id } => { event_header = "BadNodeVerification"; - let network = self.network.clone(); + let network = self.network().clone(); trace!("Need to verify whether peer {peer_id:?} is a bad node"); let _handle = spawn(async move { @@ -537,7 +582,7 @@ impl Node { } NetworkEvent::QuoteVerification { quotes } => { event_header = "QuoteVerification"; - let network = self.network.clone(); + let network = self.network().clone(); let _handle = spawn(async move { quotes_verification(&network, quotes).await; @@ -548,7 +593,7 @@ impl Node { keys_to_verify, } => { event_header = "ChunkProofVerification"; - let network = self.network.clone(); + let network = self.network().clone(); trace!("Going to verify chunk {keys_to_verify:?} against peer {peer_id:?}"); diff --git a/sn_node/src/put_validation.rs b/sn_node/src/put_validation.rs index 8ccfb75885..0fa016848b 100644 --- a/sn_node/src/put_validation.rs +++ b/sn_node/src/put_validation.rs @@ -34,7 +34,7 @@ impl Node { // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: // the record becomes stored during the fetch because of other interleaved process. - self.network.notify_fetch_completed(record.key.clone()); + self.network().notify_fetch_completed(record.key.clone()); match record_header.kind { RecordKind::ChunkWithPayment => { @@ -236,7 +236,7 @@ impl Node { } let present_locally = self - .network + .network() .is_record_key_present_locally(&data_key) .await?; @@ -269,11 +269,11 @@ impl Node { // finally store the Record directly into the local storage trace!("Storing chunk {chunk_name:?} as Record locally"); - self.network.put_local_record(record); + self.network().put_local_record(record); self.record_metrics(Marker::ValidChunkRecordPutFromNetwork(&pretty_key)); - self.events_channel + self.events_channel() .broadcast(crate::NodeEvent::ChunkStored(chunk_addr)); Ok(CmdOk::StoredSuccessfully) @@ -290,7 +290,7 @@ impl Node { // check if the Register is present locally let key = NetworkAddress::from_register_address(*reg_addr).to_record_key(); - let present_locally = self.network.is_record_key_present_locally(&key).await?; + let present_locally = self.network().is_record_key_present_locally(&key).await?; let pretty_key = PrettyPrintRecordKey::from(&key); // check register and merge if needed @@ -298,7 +298,7 @@ impl Node { Some(reg) => reg, None => { // Notify replication_fetcher to mark the attempt as completed. - self.network.notify_fetch_completed(key.clone()); + self.network().notify_fetch_completed(key.clone()); return Ok(CmdOk::DataAlreadyPresent); } }; @@ -313,7 +313,7 @@ impl Node { let content_hash = XorName::from_content(&record.value); debug!("Storing register {reg_addr:?} as Record locally"); - self.network.put_local_record(record); + self.network().put_local_record(record); self.record_metrics(Marker::ValidRegisterRecordPutFromNetwork(&pretty_key)); @@ -391,7 +391,7 @@ impl Node { publisher: None, expires: None, }; - self.network.put_local_record(record); + self.network().put_local_record(record); debug!( "Successfully stored validated spends with key: {unique_pubkey:?} at {pretty_key:?}" ); @@ -425,7 +425,7 @@ impl Node { for transfer in transfers { match transfer { Transfer::Encrypted(_) => match self - .network + .network() .verify_and_unpack_transfer(&transfer, wallet) .await { @@ -438,7 +438,7 @@ impl Node { }, Transfer::NetworkRoyalties(cashnote_redemptions) => { match self - .network + .network() .verify_cash_notes_redemptions(royalties_pk, &cashnote_redemptions) .await { @@ -490,7 +490,7 @@ impl Node { trace!("Validating record payment for {pretty_key}"); // load wallet - let mut wallet = HotWallet::load_from(self.network.root_dir_path())?; + let mut wallet = HotWallet::load_from(self.network().root_dir_path())?; let old_balance = wallet.balance().as_nano(); // unpack transfer @@ -502,7 +502,7 @@ impl Node { trace!("Received payment of {received_fee:?} for {pretty_key}"); // Notify `record_store` that the node received a payment. - self.network.notify_payment_received(); + self.network().notify_payment_received(); // deposit the CashNotes in our wallet wallet.deposit_and_store_to_disk(&cash_notes)?; @@ -513,7 +513,7 @@ impl Node { ); #[cfg(feature = "open-metrics")] - if let Some(node_metrics) = &self.node_metrics { + if let Some(node_metrics) = self.node_metrics() { let _ = node_metrics .current_reward_wallet_balance .set(new_balance as i64); @@ -526,7 +526,7 @@ impl Node { // check if the quote is valid let storecost = payment.quote.cost; - verify_quote_for_storecost(&self.network, payment.quote, address)?; + verify_quote_for_storecost(self.network(), payment.quote, address)?; trace!("Payment quote valid for record {pretty_key}"); // Let's check payment is sufficient both for our store cost and for network royalties @@ -570,7 +570,7 @@ impl Node { let key = NetworkAddress::from_register_address(*reg_addr).to_record_key(); // get local register - let maybe_record = self.network.get_local_record(&key).await?; + let maybe_record = self.network().get_local_record(&key).await?; let record = match maybe_record { Some(r) => r, None => { @@ -600,7 +600,7 @@ impl Node { // get the local spends let record_key = NetworkAddress::from_spend_address(addr).to_record_key(); debug!("Checking for local spends with key: {record_key:?}"); - let local_record = match self.network.get_local_record(&record_key).await? { + let local_record = match self.network().get_local_record(&record_key).await? { Some(r) => r, None => { debug!("Spend is not present locally: {record_key:?}"); @@ -641,7 +641,7 @@ impl Node { let mut all_verified_spends = BTreeSet::from_iter(local_spends.into_iter()); // get spends from the network at the address for that unique pubkey - let network_spends = match self.network.get_raw_spends(spend_addr).await { + let network_spends = match self.network().get_raw_spends(spend_addr).await { Ok(spends) => spends, Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => vec![], Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => { @@ -666,7 +666,7 @@ impl Node { for s in signed_spends.into_iter().chain(network_spends.into_iter()) { let self_clone = self.clone(); let _ = tasks.spawn(async move { - let res = self_clone.network.verify_spend(&s).await; + let res = self_clone.network().verify_spend(&s).await; (s, res) }); } diff --git a/sn_node/src/replication.rs b/sn_node/src/replication.rs index 7556cde9fb..d8caf554e2 100644 --- a/sn_node/src/replication.rs +++ b/sn_node/src/replication.rs @@ -32,7 +32,7 @@ impl Node { ) -> Result<()> { for (holder, key) in keys_to_fetch { let node = self.clone(); - let requester = NetworkAddress::from_peer(self.network.peer_id()); + let requester = NetworkAddress::from_peer(self.network().peer_id()); let _handle: JoinHandle> = spawn(async move { let pretty_key = PrettyPrintRecordKey::from(&key).into_owned(); trace!("Fetching record {pretty_key:?} from node {holder:?}"); @@ -40,7 +40,7 @@ impl Node { requester, key: NetworkAddress::from_record_key(&key), }); - let record_opt = if let Ok(resp) = node.network.send_request(req, holder).await { + let record_opt = if let Ok(resp) = node.network().send_request(req, holder).await { match resp { Response::Query(QueryResponse::GetReplicatedRecord(result)) => match result { @@ -71,7 +71,9 @@ impl Node { target_record: None, expected_holders: Default::default(), }; - node.network.get_record_from_network(key, &get_cfg).await? + node.network() + .get_record_from_network(key, &get_cfg) + .await? }; trace!( @@ -95,7 +97,7 @@ impl Node { paid_key: RecordKey, record_type: RecordType, ) { - let network = self.network.clone(); + let network = self.network().clone(); let _handle = spawn(async move { let start = std::time::Instant::now();