Skip to content

Commit

Permalink
chore(node): store Arc<NodeInner> to reduce some refrence counting
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin committed Jun 19, 2024
1 parent 57d8a2e commit e60e532
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 62 deletions.
125 changes: 85 additions & 40 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,18 @@ impl NodeBuilder {
let node_events_channel = NodeEventsChannel::default();
let (node_cmds, _) = broadcast::channel(10);

let node = Node {
let node = NodeInner {
network: network.clone(),
events_channel: node_events_channel.clone(),
node_cmds: node_cmds.clone(),
initial_peers: Arc::new(self.initial_peers),
reward_address: Arc::new(reward_address),
initial_peers: self.initial_peers,
reward_address,
#[cfg(feature = "open-metrics")]
node_metrics,
owner: self.owner.clone(),
owner: self.owner,
};
let node = Node {
inner: Arc::new(node),
};
let running_node = RunningNode {
network,
Expand All @@ -205,38 +208,79 @@ pub enum NodeCmd {}
/// storage, and broadcasts node-related events.
#[derive(Clone)]
pub(crate) struct Node {
pub(crate) network: Network,
pub(crate) events_channel: NodeEventsChannel,
// We keep a copy of the Sender which is clonable and we can obtain a receiver from.
node_cmds: broadcast::Sender<NodeCmd>,
inner: Arc<NodeInner>,
}

/// The actual implementation of the Node. The other is just a wrapper around this, so that we don't expose
/// the Arc from the interface.
struct NodeInner {
events_channel: NodeEventsChannel,
// Peers that are dialed at startup of node.
initial_peers: Arc<Vec<Multiaddr>>,
reward_address: Arc<MainPubkey>,
initial_peers: Vec<Multiaddr>,
network: Network,
node_cmds: broadcast::Sender<NodeCmd>,
#[cfg(feature = "open-metrics")]
pub(crate) node_metrics: Option<NodeMetrics>,
/// node owner's discord username, in readable format
/// if not set, there will be no payment forward to be undertaken
node_metrics: Option<NodeMetrics>,
/// Node owner's discord username, in readable format
/// If not set, there will be no payment forward to be undertaken
owner: Option<String>,
reward_address: MainPubkey,
}

impl Node {
/// Returns the NodeEventsChannel
pub(crate) fn events_channel(&self) -> &NodeEventsChannel {
&self.inner.events_channel
}

/// Returns the initial peers that the node will dial at startup
pub(crate) fn initial_peers(&self) -> &Vec<Multiaddr> {
&self.inner.initial_peers
}

/// Returns the instance of Network
pub(crate) fn network(&self) -> &Network {
&self.inner.network
}

/// Returns the NodeCmds channel
pub(crate) fn node_cmds(&self) -> &broadcast::Sender<NodeCmd> {
&self.inner.node_cmds
}

#[cfg(feature = "open-metrics")]
/// Returns a reference to the NodeMetrics if the `open-metrics` feature flag is enabled
pub(crate) fn node_metrics(&self) -> Option<&NodeMetrics> {
self.inner.node_metrics.as_ref()
}

/// Returns the owner of the node
pub(crate) fn owner(&self) -> Option<&String> {
self.inner.owner.as_ref()
}

/// Returns the reward address of the node
pub(crate) fn reward_address(&self) -> &MainPubkey {
&self.inner.reward_address
}

/// Runs the provided `SwarmDriver` and spawns a task to process for `NetworkEvents`
fn run(self, swarm_driver: SwarmDriver, mut network_event_receiver: Receiver<NetworkEvent>) {
let mut rng = StdRng::from_entropy();

let peers_connected = Arc::new(AtomicUsize::new(0));
let mut cmds_receiver = self.node_cmds.subscribe();
let mut cmds_receiver = self.node_cmds().subscribe();

// read the forwarded balance from the file and set the metric.
// This is done initially because reward forwarding takes a while to kick in
#[cfg(all(feature = "reward-forward", feature = "open-metrics"))]
let node_copy = self.clone();
#[cfg(all(feature = "reward-forward", feature = "open-metrics"))]
let _handle = spawn(async move {
let root_dir = node_copy.network.root_dir_path().clone();
let root_dir = node_copy.network().root_dir_path().clone();
let balance = read_forwarded_balance_value(&root_dir);

if let Some(ref node_metrics) = node_copy.node_metrics {
if let Some(node_metrics) = node_copy.node_metrics() {
let _ = node_metrics.total_forwarded_rewards.set(balance as i64);
}
});
Expand Down Expand Up @@ -300,7 +344,7 @@ impl Node {
}
None => {
error!("The `NetworkEvent` channel is closed");
self.events_channel.broadcast(NodeEvent::ChannelClosed);
self.events_channel().broadcast(NodeEvent::ChannelClosed);
break;
}
}
Expand All @@ -309,7 +353,7 @@ impl Node {
_ = replication_interval.tick() => {
let start = Instant::now();
trace!("Periodic replication triggered");
let network = self.network.clone();
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);

let _handle = spawn(async move {
Expand All @@ -321,7 +365,7 @@ impl Node {
_ = bad_nodes_check_interval.tick() => {
let start = Instant::now();
trace!("Periodic bad_nodes check triggered");
let network = self.network.clone();
let network = self.network().clone();
self.record_metrics(Marker::IntervalBadNodesCheckTriggered);

let _handle = spawn(async move {
Expand All @@ -338,14 +382,14 @@ impl Node {
// runs every balance_forward_interval time
_ = balance_forward_interval.tick() => {
if cfg!(feature = "reward-forward") {
if let Some(ref owner) = self.owner {
if let Some(owner) = self.owner() {
let start = Instant::now();
trace!("Periodic balance forward triggered");
let network = self.network.clone();
let network = self.network().clone();
let forwarding_reason = owner.clone();

#[cfg(feature = "open-metrics")]
let total_forwarded_rewards = self.node_metrics.as_ref().map(|metrics|metrics.total_forwarded_rewards.clone());
let total_forwarded_rewards = self.node_metrics().map(|metrics|metrics.total_forwarded_rewards.clone());

let _handle = spawn(async move {

Expand All @@ -361,7 +405,7 @@ impl Node {
}
_ = uptime_metrics_update_interval.tick() => {
#[cfg(feature = "open-metrics")]
if let Some(node_metrics) = &self.node_metrics {
if let Some(node_metrics) = self.node_metrics() {
let _ = node_metrics.uptime.set(node_metrics.started_instant.elapsed().as_secs() as i64);
}
}
Expand All @@ -383,7 +427,7 @@ impl Node {
pub(crate) fn record_metrics(&self, marker: Marker) {
marker.log();
#[cfg(feature = "open-metrics")]
if let Some(node_metrics) = &self.node_metrics {
if let Some(node_metrics) = self.node_metrics() {
node_metrics.record(marker)
}
}
Expand All @@ -404,28 +448,29 @@ impl Node {
// increment peers_connected and send ConnectedToNetwork event if have connected to K_VALUE peers
let _ = peers_connected.fetch_add(1, Ordering::SeqCst);
if peers_connected.load(Ordering::SeqCst) == CLOSE_GROUP_SIZE {
self.events_channel.broadcast(NodeEvent::ConnectedToNetwork);
self.events_channel()
.broadcast(NodeEvent::ConnectedToNetwork);
}

self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
self.record_metrics(Marker::PeerAddedToRoutingTable(peer_id));

// try replication here
let net_clone = self.network.clone();
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);
let _handle = spawn(async move {
Self::try_interval_replication(net_clone);
Self::try_interval_replication(network);
});
}
NetworkEvent::PeerRemoved(peer_id, connected_peers) => {
event_header = "PeerRemoved";
self.record_metrics(Marker::PeersInRoutingTable(connected_peers));
self.record_metrics(Marker::PeerRemovedFromRoutingTable(peer_id));

let net = self.network.clone();
let network = self.network().clone();
self.record_metrics(Marker::IntervalReplicationTriggered);
let _handle = spawn(async move {
Self::try_interval_replication(net);
Self::try_interval_replication(network);
});
}
NetworkEvent::PeerWithUnsupportedProtocol { .. } => {
Expand All @@ -443,18 +488,18 @@ impl Node {
bad_behaviour,
});

let network = self.network.clone();
let network = self.network().clone();
let _handle = spawn(async move {
network.send_req_ignore_reply(request, bad_peer);
});
}
NetworkEvent::NewListenAddr(_) => {
event_header = "NewListenAddr";
if !cfg!(feature = "local-discovery") {
let network = self.network.clone();
let peers = Arc::clone(&self.initial_peers);
let network = self.network().clone();
let peers = self.initial_peers().clone();
let _handle = spawn(async move {
for addr in &*peers {
for addr in peers {
if let Err(err) = network.dial(addr.clone()).await {
tracing::error!("Failed to dial {addr}: {err:?}");
};
Expand All @@ -480,8 +525,8 @@ impl Node {
}
NetworkEvent::QueryRequestReceived { query, channel } => {
event_header = "QueryRequestReceived";
let network = self.network.clone();
let payment_address = *self.reward_address;
let network = self.network().clone();
let payment_address = *self.reward_address();

let _handle = spawn(async move {
let res = Self::handle_query(&network, query, payment_address).await;
Expand All @@ -508,12 +553,12 @@ impl Node {
NetworkEvent::TerminateNode { reason } => {
event_header = "TerminateNode";
error!("Received termination from swarm_driver due to {reason:?}");
self.events_channel
self.events_channel()
.broadcast(NodeEvent::TerminateNode(format!("{reason:?}")));
}
NetworkEvent::FailedToFetchHolders(bad_nodes) => {
event_header = "FailedToFetchHolders";
let network = self.network.clone();
let network = self.network().clone();
// Note: this log will be checked in CI, and expecting `not appear`.
// any change to the keyword `failed to fetch` shall incur
// correspondent CI script change as well.
Expand All @@ -526,7 +571,7 @@ impl Node {
}
NetworkEvent::BadNodeVerification { peer_id } => {
event_header = "BadNodeVerification";
let network = self.network.clone();
let network = self.network().clone();

trace!("Need to verify whether peer {peer_id:?} is a bad node");
let _handle = spawn(async move {
Expand All @@ -537,7 +582,7 @@ impl Node {
}
NetworkEvent::QuoteVerification { quotes } => {
event_header = "QuoteVerification";
let network = self.network.clone();
let network = self.network().clone();

let _handle = spawn(async move {
quotes_verification(&network, quotes).await;
Expand All @@ -548,7 +593,7 @@ impl Node {
keys_to_verify,
} => {
event_header = "ChunkProofVerification";
let network = self.network.clone();
let network = self.network().clone();

trace!("Going to verify chunk {keys_to_verify:?} against peer {peer_id:?}");

Expand Down
Loading

0 comments on commit e60e532

Please sign in to comment.