Skip to content

Commit

Permalink
src: reduce scope of the clones
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Sep 10, 2024
1 parent 05b8976 commit 4dde7df
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 78 deletions.
98 changes: 53 additions & 45 deletions src/drivers/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,34 +207,35 @@ impl Driver for FakeSource {
buf.clear();
mavlink::write_v2_msg(&mut buf, header, &data).expect("Failed to write message");

let hub_sender_cloned = hub_sender.clone();
read_all_messages("FakeSource", &mut buf, move |message| {
let message = Arc::new(message);
let hub_sender = hub_sender_cloned.clone();

async move {
trace!("Fake message created: {message:?}");

self.stats
.write()
.await
.update_output(Arc::clone(&message))
.await;

for future in self.on_message_output.call_all(Arc::clone(&message)) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;
read_all_messages("FakeSource", &mut buf, {
let hub_sender = hub_sender.clone();
move |message| {
let message = Arc::new(message);
let hub_sender = hub_sender.clone();

async move {
trace!("Fake message created: {message:?}");

self.stats
.write()
.await
.update_output(Arc::clone(&message))
.await;

for future in self.on_message_output.call_all(Arc::clone(&message)) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;
}
}
}

if let Err(error) = hub_sender.send(message) {
error!("Failed to send message to hub: {error:?}");
if let Err(error) = hub_sender.send(message) {
error!("Failed to send message to hub: {error:?}");
}
}
}
})
}})
.await;

tokio::time::sleep(self.period).await;
Expand Down Expand Up @@ -319,14 +320,17 @@ mod test {
let sink_messages = Arc::new(RwLock::new(Vec::<Arc<Protocol>>::with_capacity(1000)));

// FakeSink and task
let sink_messages_clone = sink_messages.clone();
let sink = FakeSink::builder()
.on_message_input(move |message: Arc<Protocol>| {
let sink_messages = sink_messages_clone.clone();
.on_message_input({
let sink_messages = sink_messages.clone();

move |message: Arc<Protocol>| {
let sink_messages = sink_messages.clone();

async move {
sink_messages.write().await.push(message);
Ok(())
async move {
sink_messages.write().await.push(message);
Ok(())
}
}
})
.build();
Expand All @@ -337,14 +341,16 @@ mod test {
});

// FakeSource and task
let source_messages_clone = source_messages.clone();
let source = FakeSource::builder(message_period)
.on_message_output(move |message: Arc<Protocol>| {
let source_messages = source_messages_clone.clone();

async move {
source_messages.write().await.push(message);
Ok(())
.on_message_output({
let source_messages = source_messages.clone();
move |message: Arc<Protocol>| {
let source_messages = source_messages.clone();

async move {
source_messages.write().await.push(message);
Ok(())
}
}
})
.build();
Expand All @@ -355,14 +361,16 @@ mod test {
});

// Monitoring task to wait the
let sink_messages_clone = sink_messages.clone();
let sink_monitor_task = tokio::spawn(async move {
loop {
if sink_messages_clone.read().await.len() >= number_of_messages {
break;
}
let sink_monitor_task = tokio::spawn({
let sink_messages = sink_messages.clone();
async move {
loop {
if sink_messages.read().await.len() >= number_of_messages {
break;
}

tokio::time::sleep(std::time::Duration::from_millis(1)).await;
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
}
}
});
let _ = tokio::time::timeout(timeout_time, sink_monitor_task)
Expand Down
14 changes: 8 additions & 6 deletions src/drivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,17 @@ mod tests {
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

let called = Arc::new(RwLock::new(false));
let called_cloned = called.clone();
let driver = ExampleDriver::new()
.on_message_input(move |_msg| {
let called = called_cloned.clone();
.on_message_input({
let called = called.clone();
move |_msg| {
let called = called.clone();

async move {
*called.write().await = true;
async move {
*called.write().await = true;

Err(anyhow!("Finished from callback"))
Err(anyhow!("Finished from callback"))
}
}
})
.build();
Expand Down
3 changes: 1 addition & 2 deletions src/drivers/tcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ impl Driver for TcpClient {
debug!("TcpClient successfully connected to {server_addr:?}");

let hub_receiver = hub_sender.subscribe();
let hub_sender_cloned = Arc::clone(&hub_sender);

tokio::select! {
result = tcp_receive_task(read, server_addr, hub_sender_cloned, &self.on_message_input, &self.stats) => {
result = tcp_receive_task(read, server_addr, Arc::clone(&hub_sender), &self.on_message_input, &self.stats) => {
if let Err(e) = result {
error!("Error in TCP receive task: {e:?}");
}
Expand Down
28 changes: 15 additions & 13 deletions src/drivers/tlog/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,25 +233,27 @@ mod tests {

let messages_received_per_id =
Arc::new(RwLock::new(BTreeMap::<u32, Vec<Arc<Protocol>>>::new()));
let messages_received_cloned = messages_received_per_id.clone();

let tlog_file = PathBuf::from_str("tests/files/00025-2024-04-22_18-49-07.tlog").unwrap();

let driver = TlogReader::builder(tlog_file.clone())
.on_message_input(move |message: Arc<Protocol>| {
let messages_received = messages_received_cloned.clone();

async move {
let message_id = message.message_id();
.on_message_input({
let messages_received_per_id = messages_received_per_id.clone();
move |message: Arc<Protocol>| {
let messages_received = messages_received_per_id.clone();

async move {
let message_id = message.message_id();

let mut messages_received = messages_received.write().await;
if let Some(samples) = messages_received.get_mut(&message_id) {
samples.push(message);
} else {
messages_received.insert(message_id, Vec::from([message]));
}

let mut messages_received = messages_received.write().await;
if let Some(samples) = messages_received.get_mut(&message_id) {
samples.push(message);
} else {
messages_received.insert(message_id, Vec::from([message]));
Ok(())
}

Ok(())
}
})
.build();
Expand Down
21 changes: 9 additions & 12 deletions src/hub/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,15 @@ impl HubActor {
) -> Self {
let (bcst_sender, _) = broadcast::channel(buffer_size);

let bcst_sender_cloned = bcst_sender.clone();
let component_id_cloned = component_id.clone();
let system_id_cloned = system_id.clone();
let frequency_cloned = frequency.clone();
let heartbeat_task = tokio::spawn(async move {
Self::heartbeat_task(
bcst_sender_cloned,
component_id_cloned,
system_id_cloned,
frequency_cloned,
)
.await
let heartbeat_task = tokio::spawn({
let bcst_sender = bcst_sender.clone();
let component_id = component_id.clone();
let system_id = system_id.clone();
let frequency = frequency.clone();

Self::heartbeat_task(bcst_sender, component_id, system_id, frequency)
});

});

Self {
Expand Down

0 comments on commit 4dde7df

Please sign in to comment.