-
Notifications
You must be signed in to change notification settings - Fork 237
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
2e194b8
commit ca8c57e
Showing
7 changed files
with
84 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
// Copyright 2023-2024 - Nym Technologies SA <[email protected]> | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use anyhow::{anyhow, bail}; | ||
use anyhow::{anyhow, bail, Error, Result}; | ||
use lazy_static::lazy_static; | ||
use nym_sdk::mixnet::{ | ||
MixnetClient, MixnetClientBuilder, MixnetMessageSender, Recipient, ReconstructedMessage, | ||
|
@@ -13,19 +13,25 @@ use std::path::PathBuf; | |
use std::sync::{Arc, Mutex}; | ||
use tokio::runtime::Runtime; | ||
|
||
// NYM_CLIENT/PROXIES: Static reference (only init-ed once) to: | ||
// - Arc: share ownership | ||
// - Mutex: thread-safe way to share data between threads | ||
// - Option: init-ed or not | ||
// RUNTIME: Tokio runtime: no need to pass back to C and deal with raw pointers as it was previously | ||
// NYM_CLIENT/PROXIES: Static thread-safe reference (init once) to Option<Client>s. | ||
// RUNTIME: Tokio runtime: no need to pass across FFI boundary and deal with raw pointers. | ||
lazy_static! { | ||
static ref NYM_PROXY_CLIENT: Arc<Mutex<Option<NymProxyClient>>> = Arc::new(Mutex::new(None)); | ||
static ref NYM_PROXY_SERVER: Arc<Mutex<Option<NymProxyServer>>> = Arc::new(Mutex::new(None)); | ||
static ref NYM_CLIENT: Arc<Mutex<Option<MixnetClient>>> = Arc::new(Mutex::new(None)); | ||
static ref RUNTIME: Runtime = Runtime::new().unwrap(); | ||
} | ||
|
||
pub fn init_ephemeral_internal() -> anyhow::Result<(), anyhow::Error> { | ||
// TODO create get_client() to use in fns and remove code repetition | ||
fn get_client_as_ref() -> bool { | ||
NYM_CLIENT.lock().unwrap().as_ref().is_some() | ||
} | ||
|
||
fn get_client_guard() -> bool { | ||
todo!() | ||
} | ||
|
||
pub fn init_ephemeral_internal() -> Result<(), Error> { | ||
if NYM_CLIENT.lock().unwrap().as_ref().is_some() { | ||
bail!("client already exists"); | ||
} else { | ||
|
@@ -37,13 +43,27 @@ pub fn init_ephemeral_internal() -> anyhow::Result<(), anyhow::Error> { | |
} else { | ||
return Err(anyhow!("couldnt lock ephemeral NYM_CLIENT")); | ||
} | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
} | ||
// if get_client_as_ref() { | ||
// RUNTIME.block_on(async move { | ||
// let init_client = MixnetClient::connect_new().await?; | ||
// let mut client = NYM_CLIENT.try_lock(); | ||
// if let Ok(ref mut client) = client { | ||
// **client = Some(init_client); | ||
// } else { | ||
// return Err(anyhow!("couldnt lock ephemeral NYM_CLIENT")); | ||
// } | ||
// Ok::<(), Error>(()) | ||
// })?; | ||
// } else { | ||
// bail!("client already exists: no need to reinitialise"); | ||
// } | ||
Ok(()) | ||
} | ||
|
||
pub fn init_default_storage_internal(config_dir: PathBuf) -> anyhow::Result<(), anyhow::Error> { | ||
pub fn init_default_storage_internal(config_dir: PathBuf) -> Result<(), Error> { | ||
if NYM_CLIENT.lock().unwrap().as_ref().is_some() { | ||
bail!("client already exists"); | ||
} else { | ||
|
@@ -60,13 +80,13 @@ pub fn init_default_storage_internal(config_dir: PathBuf) -> anyhow::Result<(), | |
} else { | ||
return Err(anyhow!("couldnt lock NYM_CLIENT")); | ||
} | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub fn get_self_address_internal() -> anyhow::Result<String, anyhow::Error> { | ||
pub fn get_self_address_internal() -> Result<String, Error> { | ||
let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); | ||
if client.is_none() { | ||
bail!("Client is not yet initialised"); | ||
|
@@ -83,7 +103,8 @@ pub fn get_self_address_internal() -> anyhow::Result<String, anyhow::Error> { | |
pub fn send_message_internal( | ||
recipient: Recipient, | ||
message: &str, | ||
) -> anyhow::Result<(), anyhow::Error> { | ||
// TODO add Option<surb_amount>, if Some(surb_amount) call send_message() instead with specified #, else send_plain_message as this uses the default | ||
) -> Result<(), Error> { | ||
let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); | ||
if client.is_none() { | ||
bail!("Client is not yet initialised"); | ||
|
@@ -94,17 +115,14 @@ pub fn send_message_internal( | |
|
||
RUNTIME.block_on(async move { | ||
nym_client.send_plain_message(recipient, message).await?; | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
// TODO send_raw_message_internal | ||
|
||
pub fn reply_internal( | ||
recipient: AnonymousSenderTag, | ||
message: &str, | ||
) -> anyhow::Result<(), anyhow::Error> { | ||
pub fn reply_internal(recipient: AnonymousSenderTag, message: &str) -> Result<(), Error> { | ||
let client = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); | ||
if client.is_none() { | ||
bail!("Client is not yet initialised"); | ||
|
@@ -115,12 +133,12 @@ pub fn reply_internal( | |
|
||
RUNTIME.block_on(async move { | ||
nym_client.send_reply(recipient, message).await?; | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
pub fn listen_for_incoming_internal() -> anyhow::Result<ReconstructedMessage, anyhow::Error> { | ||
pub fn listen_for_incoming_internal() -> Result<ReconstructedMessage, Error> { | ||
let mut binding = NYM_CLIENT.lock().expect("could not lock NYM_CLIENT"); | ||
if binding.is_none() { | ||
bail!("recipient is null"); | ||
|
@@ -131,7 +149,7 @@ pub fn listen_for_incoming_internal() -> anyhow::Result<ReconstructedMessage, an | |
|
||
let message = RUNTIME.block_on(async move { | ||
let received = wait_for_non_empty_message(client).await?; | ||
Ok::<ReconstructedMessage, anyhow::Error>(ReconstructedMessage { | ||
Ok::<ReconstructedMessage, Error>(ReconstructedMessage { | ||
message: received.message, | ||
sender_tag: received.sender_tag, | ||
}) | ||
|
@@ -140,9 +158,7 @@ pub fn listen_for_incoming_internal() -> anyhow::Result<ReconstructedMessage, an | |
Ok(message) | ||
} | ||
|
||
pub async fn wait_for_non_empty_message( | ||
client: &mut MixnetClient, | ||
) -> anyhow::Result<ReconstructedMessage> { | ||
pub async fn wait_for_non_empty_message(client: &mut MixnetClient) -> Result<ReconstructedMessage> { | ||
while let Some(mut new_message) = client.wait_for_messages().await { | ||
if !new_message.is_empty() { | ||
return new_message | ||
|
@@ -159,7 +175,8 @@ pub fn proxy_client_new_internal( | |
listen_port: &str, | ||
close_timeout: u64, | ||
env: Option<String>, | ||
) -> anyhow::Result<(), anyhow::Error> { | ||
pool_size: usize, | ||
) -> Result<(), Error> { | ||
if NYM_PROXY_CLIENT.lock().unwrap().as_ref().is_some() { | ||
bail!("proxy client already exists"); | ||
} else { | ||
|
@@ -170,6 +187,7 @@ pub fn proxy_client_new_internal( | |
listen_port, | ||
close_timeout, | ||
env, | ||
pool_size, | ||
) | ||
.await?; | ||
let mut client = NYM_PROXY_CLIENT.try_lock(); | ||
|
@@ -178,7 +196,7 @@ pub fn proxy_client_new_internal( | |
} else { | ||
return Err(anyhow!("couldnt lock NYM_PROXY_CLIENT")); | ||
} | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
} | ||
Ok(()) | ||
|
@@ -187,7 +205,7 @@ pub fn proxy_client_new_internal( | |
pub fn proxy_client_new_defaults_internal( | ||
server_address: Recipient, | ||
env: Option<String>, | ||
) -> anyhow::Result<(), anyhow::Error> { | ||
) -> Result<(), Error> { | ||
if NYM_PROXY_CLIENT.lock().unwrap().as_ref().is_some() { | ||
bail!("proxy client already exists"); | ||
} else { | ||
|
@@ -199,13 +217,13 @@ pub fn proxy_client_new_defaults_internal( | |
} else { | ||
return Err(anyhow!("couldn't lock PROXY_CLIENT")); | ||
} | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub fn proxy_client_run_internal() -> anyhow::Result<(), anyhow::Error> { | ||
pub fn proxy_client_run_internal() -> Result<(), Error> { | ||
let proxy_client = NYM_PROXY_CLIENT | ||
.lock() | ||
.expect("could not lock NYM_PROXY_CLIENT"); | ||
|
@@ -217,7 +235,7 @@ pub fn proxy_client_run_internal() -> anyhow::Result<(), anyhow::Error> { | |
.ok_or_else(|| anyhow!("could not get proxy_client as_ref()"))?; | ||
RUNTIME.block_on(async move { | ||
proxy.run().await?; | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
Ok(()) | ||
} | ||
|
@@ -226,7 +244,7 @@ pub fn proxy_server_new_internal( | |
upstream_address: &str, | ||
config_dir: &str, | ||
env: Option<String>, | ||
) -> anyhow::Result<(), anyhow::Error> { | ||
) -> Result<(), Error> { | ||
if NYM_PROXY_SERVER.lock().unwrap().as_ref().is_some() { | ||
bail!("proxy client already exists"); | ||
} else { | ||
|
@@ -238,13 +256,13 @@ pub fn proxy_server_new_internal( | |
} else { | ||
return Err(anyhow!("couldn't lock PROXY_SERVER")); | ||
} | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub fn proxy_server_run_internal() -> anyhow::Result<(), anyhow::Error> { | ||
pub fn proxy_server_run_internal() -> Result<(), Error> { | ||
let mut proxy_server = NYM_PROXY_SERVER | ||
.lock() | ||
.expect("could not lock NYM_PROXY_CLIENT"); | ||
|
@@ -256,12 +274,12 @@ pub fn proxy_server_run_internal() -> anyhow::Result<(), anyhow::Error> { | |
.ok_or_else(|| anyhow!("could not get proxy_client as_ref()"))?; | ||
RUNTIME.block_on(async move { | ||
proxy.run_with_shutdown().await?; | ||
Ok::<(), anyhow::Error>(()) | ||
Ok::<(), Error>(()) | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
pub fn proxy_server_address_internal() -> anyhow::Result<Recipient, anyhow::Error> { | ||
pub fn proxy_server_address_internal() -> Result<Recipient, Error> { | ||
let mut proxy_server = NYM_PROXY_SERVER | ||
.lock() | ||
.expect("could not lock NYM_PROXY_CLIENT"); | ||
|
File renamed without changes.