From b57c224cd0f8a2e6f3b607a96d4697b95deef215 Mon Sep 17 00:00:00 2001 From: jmwample Date: Tue, 5 Nov 2024 17:34:02 -0700 Subject: [PATCH] remove tokio spawn to potentially accomodate wasm32 --- Cargo.lock | 1 + .../client-core/src/client/base_client/mod.rs | 2 +- common/statistics/Cargo.toml | 3 ++- common/statistics/src/clients/mod.rs | 22 +++++++++++++++++-- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e2c3ec583c..8ffb1ff3cc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6597,6 +6597,7 @@ dependencies = [ "nym-credentials-interface", "nym-metrics", "nym-sphinx", + "nym-task", "os_info", "serde", "serde_json", diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index afc35774747..054e79ff779 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -608,7 +608,7 @@ where stats_control.start_with_shutdown(shutdown.fork("statistics_control")); stats_reporter } - None => ClientStatsSender::sink(), + None => ClientStatsSender::sink(shutdown.fork("statistics_sink")), } } diff --git a/common/statistics/Cargo.toml b/common/statistics/Cargo.toml index aca14e863d2..0ff147c809c 100644 --- a/common/statistics/Cargo.toml +++ b/common/statistics/Cargo.toml @@ -22,4 +22,5 @@ si-scale = { workspace = true } nym-sphinx = { path = "../nymsphinx" } nym-credentials-interface = { path = "../credentials-interface" } -nym-metrics = { path = "../nym-metrics" } \ No newline at end of file +nym-metrics = { path = "../nym-metrics" } +nym-task = { path = "../task" } diff --git a/common/statistics/src/clients/mod.rs b/common/statistics/src/clients/mod.rs index ae95e041b62..adde5c280a4 100644 --- a/common/statistics/src/clients/mod.rs +++ b/common/statistics/src/clients/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::report::StatisticsReporter; +use nym_task::spawn; use tokio::sync::mpsc::UnboundedSender; @@ -37,9 +38,26 @@ impl ClientStatsSender { } /// Used when stats reporting is disabled -- reads all incoming messages and discards them - pub fn sink() -> Self { + pub fn sink(mut shutdown: nym_task::TaskClient) -> Self { let (stats_tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - tokio::spawn(async move { while (rx.recv().await).is_some() {} }); + + spawn(async move { + loop { + tokio::select! { + m = rx.recv() => { + if m.is_none() { + log::trace!("StatisticsSink: channel closed shutting down"); + break; + } + }, + _ = shutdown.recv_with_delay() => { + log::trace!("StatisticsSink: Received shutdown"); + break; + }, + } + } + log::debug!("StatsSink: Exited"); + }); Self { stats_tx } } }