Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max/client pool #5188

Merged
merged 68 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
27d9699
tcp conn tracker
mfahampshire Nov 6, 2024
8440763
small tweak to tcp tracker
mfahampshire Nov 6, 2024
5023295
name change + specific inc and dec logging
mfahampshire Nov 6, 2024
01d7f0b
changed logging
mfahampshire Nov 6, 2024
dc80116
tweak
mfahampshire Nov 6, 2024
28005d0
tweak
mfahampshire Nov 6, 2024
2fc714b
tweak existing conn tracker logic
mfahampshire Nov 21, 2024
9c6f887
make default decay const
mfahampshire Nov 21, 2024
b33f8b3
first pass connpool
mfahampshire Nov 25, 2024
ed42510
fix flipped lowerthan waiting for pool
mfahampshire Nov 25, 2024
b0e0c28
first pass disconnect
mfahampshire Nov 26, 2024
862d617
trying to get disconnect to remove from pool
mfahampshire Nov 26, 2024
ee96121
remove double accounting for moment
mfahampshire Nov 26, 2024
6eb2dd7
remove comments and reduce logging verbosity
mfahampshire Nov 27, 2024
1a8cdd9
comments
mfahampshire Nov 27, 2024
19fd579
removed a bunch of commenting from example
mfahampshire Nov 27, 2024
dbe1d42
removed a bunch of commenting from example
mfahampshire Nov 27, 2024
05010ee
err handling conpool start
mfahampshire Nov 27, 2024
bb83170
added notes for next features
mfahampshire Nov 27, 2024
644f266
first version working
mfahampshire Nov 27, 2024
dd95462
first pass spin out client_pool
mfahampshire Nov 27, 2024
203c47c
cancel token
mfahampshire Nov 27, 2024
67366d1
logging change
mfahampshire Nov 27, 2024
6682785
minor tweaks
mfahampshire Nov 27, 2024
313e2ae
bump default decay time
mfahampshire Dec 2, 2024
c80316a
updated code commnet
mfahampshire Dec 2, 2024
5d2787b
bump default decay time
mfahampshire Dec 2, 2024
7828091
bump default decay time (again)
mfahampshire Dec 2, 2024
d45283c
temp commit
mfahampshire Dec 3, 2024
efed27f
bugfix: make sure to apply gateway score filtering when choosing init…
jstuczyn Dec 12, 2024
e72cc48
add duplicate packets received to troubleshooting
mfahampshire Dec 13, 2024
2eb7fd5
slightly dropped default decay time
mfahampshire Dec 13, 2024
99b678e
comments to examples
mfahampshire Dec 13, 2024
4a3c6d1
some inline comments
mfahampshire Dec 13, 2024
624b75c
minor comments
mfahampshire Dec 13, 2024
2bfffe0
client_pool.rs mod
mfahampshire Dec 13, 2024
708b78e
client pool example
mfahampshire Dec 13, 2024
00685d2
clippy
mfahampshire Dec 13, 2024
01b6b66
comments
mfahampshire Dec 13, 2024
236cfa9
client pool example done
mfahampshire Dec 13, 2024
10731f3
added disconnect to client pool
mfahampshire Dec 13, 2024
925b0b5
update mod file
mfahampshire Dec 13, 2024
06c6b50
add cancel token disconnect fn
mfahampshire Dec 16, 2024
41f3d5e
comments
mfahampshire Dec 16, 2024
d06891d
comments
mfahampshire Dec 16, 2024
7346c2f
add clone
mfahampshire Dec 16, 2024
16d6395
added disconnect thread
mfahampshire Dec 16, 2024
5106fef
update example files tcpproxy
mfahampshire Dec 16, 2024
5f00887
client pool docs
mfahampshire Dec 16, 2024
f882d86
remove comments for future ffi push + lower default pool size from 4 …
mfahampshire Dec 16, 2024
b20d21f
comment on ffi
mfahampshire Dec 16, 2024
044ac06
update command help
mfahampshire Dec 16, 2024
f09b07a
clone impl
mfahampshire Dec 16, 2024
43bb284
remove clone
mfahampshire Dec 16, 2024
5569cf4
fix clippy
mfahampshire Dec 16, 2024
bb3ab81
fix clippy again
mfahampshire Dec 17, 2024
80a72c3
fix clippy again
mfahampshire Dec 17, 2024
74f249d
version bump
mfahampshire Dec 17, 2024
0324832
version bump
mfahampshire Dec 17, 2024
56949ba
fix test
mfahampshire Dec 17, 2024
15d0bed
tweaked text grammar
mfahampshire Dec 17, 2024
cffc946
updated comment in example
mfahampshire Dec 17, 2024
18c8016
future is now
mfahampshire Dec 17, 2024
a3a567c
cherry
jstuczyn Dec 12, 2024
a83b7c6
cherry
jstuczyn Dec 12, 2024
34891e8
fix borked rebase
mfahampshire Jan 13, 2025
685d41d
fix fmt
mfahampshire Jan 14, 2025
a7679d0
wasm fix
mfahampshire Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions documentation/docs/pages/developers/rust/_meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"development-status": "Development Status",
"mixnet": "Mixnet Module",
"tcpproxy": "TcpProxy Module",
"client-pool": "Client Pool",
"ffi": "FFI",
"tutorials": "Tutorials (Coming Soon)"
}
7 changes: 7 additions & 0 deletions documentation/docs/pages/developers/rust/client-pool.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Client Pool

We have a configurable-size Client Pool for processes that require multiple clients in quick succession (this is used by default by the [`TcpProxyClient`](./tcpproxy) for instance)

This will be useful for developers looking to build connection logic, or just are using raw SDK clients in a sitatuation where there are multiple connections with a lot of churn.

> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"architecture": "Architecture",
"example": "Example"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Client Pool Architecture

## Motivations
mfahampshire marked this conversation as resolved.
Show resolved Hide resolved
In situations where multiple connections are expected, and the number of connections can vary greatly, the Client Pool reduces time spent waiting for the creation of a Mixnet Client blocking your code sending traffic through the Mixnet. Instead, a configurable number of Clients can be generated and run in the background which can be very quickly grabbed, used, and disconnected.

The Pool can be simply run as a background process for the runtime of your program.

## Clients & Lifetimes
The Client Pool creates **ephemeral Mixnet Clients** which are used and then disconnected. Using the [`TcpProxy`](../tcpproxy) as an example, Clients are used for the lifetime of a single incoming TCP connection; after the TCP connection is closed, the Mixnet client is disconnected.

Clients are popped from the pool when in use, and another Client is created to take its place. If connections are coming in faster than Clients are replenished, you can instead generate an ephemeral Client on the fly, or wait; this is up to the developer to decide. You can see an example of this logic in the example on the next page.

## Runtime Loop
Aside from a few helper / getter functions and a graceful `disconnect_pool()`, the Client Pool is mostly made up of a very simple loop around some conditional logic making up `start()`:
- if the number of Clients in the pool is `< client_pool_reserve_number` (set on `new()`) then create more,
- if the number of Clients in the pool `== client_pool_reserve_number` (set on `new()`) then `sleep`,
- if `client_pool_reserve_number == 0` just `sleep`.

`disconnect_pool()` will cause this loop to `break` via cancellation token.
100 changes: 100 additions & 0 deletions documentation/docs/pages/developers/rust/client-pool/example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Client Pool Example

> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs)

```rust
use anyhow::Result;
use nym_network_defaults::setup_env;
use nym_sdk::client_pool::ClientPool;
use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails};
use tokio::signal::ctrl_c;

// This client pool is used internally by the TcpProxyClient but can also be used by the Mixnet module, in case you're quickly swapping clients in and out but won't want to use the TcpProxy module.
//
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_logging();
setup_env(std::env::args().nth(1));

let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
let client_maker = conn_pool.clone();
tokio::spawn(async move {
client_maker.start().await?;
Ok::<(), anyhow::Error>(())
});

println!("\n\nWaiting a few seconds to fill pool\n\n");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

let pool_clone_one = conn_pool.clone();
let pool_clone_two = conn_pool.clone();

tokio::spawn(async move {
let client_one = match pool_clone_one.get_mixnet_client().await {
Some(client) => {
println!("Grabbed client {} from pool", client.nym_address());
client
}
None => {
println!("Not enough clients in pool, creating ephemeral client");
let net = NymNetworkDetails::new_from_env();
let client = MixnetClientBuilder::new_ephemeral()
.network_details(net)
.build()?
.connect_to_mixnet()
.await?;
println!(
"Using {} for the moment, created outside of the connection pool",
client.nym_address()
);
client
}
};
let our_address = client_one.nym_address();
println!("\n\nClient 1: {our_address}\n\n");
client_one.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
return Ok::<(), anyhow::Error>(());
});

tokio::spawn(async move {
let client_two = match pool_clone_two.get_mixnet_client().await {
Some(client) => {
println!("Grabbed client {} from pool", client.nym_address());
client
}
None => {
println!("Not enough clients in pool, creating ephemeral client");
let net = NymNetworkDetails::new_from_env();
let client = MixnetClientBuilder::new_ephemeral()
.network_details(net)
.build()?
.connect_to_mixnet()
.await?;
println!(
"Using {} for the moment, created outside of the connection pool",
client.nym_address()
);
client
}
};
let our_address = *client_two.nym_address();
println!("\n\nClient 2: {our_address}\n\n");
client_two.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
return Ok::<(), anyhow::Error>(());
});

wait_for_ctrl_c(conn_pool).await?;
Ok(())
}

async fn wait_for_ctrl_c(pool: ClientPool) -> Result<()> {
println!("\n\nPress CTRL_C to disconnect pool\n\n");
ctrl_c().await?;
println!("CTRL_C received. Killing client pool");
pool.disconnect_pool().await;
Ok(())
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ In the future the SDK will be made up of several modules, each of which will all
|-----------|---------------------------------------------------------------------------------------|----------|
| Mixnet | Create / load clients & keypairs, subscribe to Mixnet events, send & receive messages | ✔️ |
| TcpProxy | Utilise the TcpProxyClient and TcpProxyServer abstractions for streaming | ✔️ |
| ClientPool| Create a pool of quickly useable Mixnet clients | ✔️ |
| Ecash | Create & verify Ecash credentials | ❌ |
| Validator | Sign & broadcast Nyx blockchain transactions, query the blockchain | ❌ |

The `Mixnet` module currently exposes the logic of two clients: the [websocket client](../clients/websocket), and the [socks client](../clients/socks5).

The `TcpProxy` module exposes functionality to set up client/server instances that expose a localhost TcpSocket to read/write to.

The `ClientPool` is a configurable pool of ephemeral clients which can be created as a background process and quickly grabbed.
9 changes: 6 additions & 3 deletions documentation/docs/pages/developers/rust/ffi.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The main functionality of exposed functions will be imported from `sdk/ffi/share

Furthermore, the `shared/` code makes sure that client access is thread-safe, and that client actions happen in blocking threads on the Rust side of the FFI boundary.

### Mixnet Module
## Mixnet Module
This is the basic mixnet component of the SDK, exposing client functionality with which people can build custom interfaces with the Mixnet. These functions are exposed to both Go and C/C++ via the `sdk/ffi/shared/` crate.

| `shared/lib.rs` function | Rust Function |
Expand All @@ -36,13 +36,13 @@ This is the basic mixnet component of the SDK, exposing client functionality wit

> We have also implemented `listen_for_incoming_internal()` which is a wrapper around the Mixnet client's `wait_for_messages()`. This is a helper method for listening out for and handling incoming messages.

#### Currently Unsupported Functionality
### Currently Unsupported Functionality
At the time of writing the following functionality is not exposed to the shared FFI library:
- `split_sender()`: the ability to [split a client into sender and receiver](./mixnet/examples/split-send) for concurrent send/receive.
- The use of [custom network topologies](./mixnet/examples/custom-topology).
- `Socks5::new()`: creation and use of the [socks5/4a/4 proxy client](./mixnet/examples/socks).

### TcpProxy Module
## TcpProxy Module
A connection abstraction which exposes a local TCP socket which developers are able to interact with basically as expected, being able to read/write to/from a bytestream, without really having to take into account the workings of the Mixnet/Sphinx/the [message-based](../concepts/messages) format of the underlying client.

<Callout type="info" emoji="ℹ️">
Expand All @@ -58,3 +58,6 @@ A connection abstraction which exposes a local TCP socket which developers are a
| `proxy_server_new_internal(upstream_address: &str, config_dir: &str, env: Option<String>)` | `NymProxyServer::new(upstream_address, config_dir, env)` |
| `proxy_server_run_internal()` | `NymProxyServer.run_with_shutdown()` |
| `proxy_server_address_internal()` | `NymProxyServer.nym_address()` |

## Client Pool
There are currently no FFI bindings for the Client Pool. This will be coming in the future. The bindings for the TcpProxy have been updated to be able to use the Client Pool under the hood, but the standalone Pool is not yet exposed to FFI.
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@ Whether the `data` of a SURB request being empty is a feature or a bug is to be
You can find a few helper functions [here](./message-helpers.md) to help deal with this issue in the meantime.

> If you can think of a more succinct or different way of handling this do reach out - we're happy to hear other opinions

## Lots of `duplicate fragment received` messages
You might see a lot of `WARN` level logs about duplicate fragments in your logs, depending on the log level you're using. This occurs when a packet is retransmitted somewhere in the Mixnet, but then the original makes it to the destination client as well. This is not something to do with your client logic, but instead the state of the Mixnet.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"architecture": "Architecture",
"examples": "Examples"
"examples": "Examples",
"troubleshooting": "Troubleshooting"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The motivation behind the creation of the `TcpProxy` module is to allow develope

## Clients
Each of the sub-modules exposed by the `TcpProxy` deal with Nym clients in a different way.
- the `NymProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams. In the future, this will be superceded by a connection pool in order to speed up new connections.
- the `NymProxyClient` relies on the [`Client Pool`](../client-pool) to create clients and keep a certain number of them in reserve. If the amount of incoming TCP connections rises quicker than the Client Pool can create clients, or you have the pool size set to `0`, the `TcpProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams.
- the `NymProxyServer` has a single Nym client with a persistent identity.

## Framing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use tokio::net::TcpStream;
use tokio::signal;
use tokio_stream::StreamExt;
use tokio_util::codec;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[derive(Serialize, Deserialize, Debug)]
struct ExampleMessage {
Expand All @@ -26,6 +28,8 @@ struct ExampleMessage {
tcp_conn: i8,
}

// This example just starts off a bunch of Tcp connections on a loop to a remote endpoint: in this case the TcpListener behind the NymProxyServer instance on the echo server found in `nym/tools/echo-server/`. It pipes a few messages to it, logs the replies, and keeps track of the number of replies received per connection.
//
// To run:
// - run the echo server with `cargo run`
// - run this example with `cargo run --example tcp_proxy_multistream -- <ECHO_SERVER_NYM_ADDRESS> <ENV_FILE_PATH> <CLIENT_PORT>` e.g.
Expand All @@ -40,32 +44,56 @@ async fn main() -> anyhow::Result<()> {
// Nym client logging is very informative but quite verbose.
// The Message Decay related logging gives you an ideas of the internals of the proxy message ordering: you need to switch
// to DEBUG to see the contents of the msg buffer, sphinx packet chunking, etc.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
tracing_subscriber::registry()
.with(fmt::layer())
.with(
EnvFilter::new("info")
.add_directive("nym_sdk::client_pool=info".parse().unwrap())
.add_directive("nym_sdk::tcp_proxy_client=debug".parse().unwrap()),
)
.init();

let env_path = env::args().nth(2).expect("Env file not specified");
let env = env_path.to_string();

let listen_port = env::args().nth(3).expect("Port not specified");

// Within the TcpProxyClient, individual client shutdown is triggered by the timeout.
// Within the TcpProxyClient, individual client shutdown is triggered by the timeout. The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy.
let proxy_client =
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env)).await?;
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env), 2).await?;

// For our disconnect() logic below
let proxy_clone = proxy_client.clone();

tokio::spawn(async move {
proxy_client.run().await?;
Ok::<(), anyhow::Error>(())
});

let example_cancel_token = CancellationToken::new();
let client_cancel_token = example_cancel_token.clone();
let watcher_cancel_token = example_cancel_token.clone();

// Cancel listener thread
tokio::spawn(async move {
signal::ctrl_c().await?;
println!(":: CTRL_C received, shutting down + cleanup up proxy server config files");
watcher_cancel_token.cancel();
proxy_clone.disconnect().await;
Ok::<(), anyhow::Error>(())
});

println!("waiting for everything to be set up..");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("done. sending bytes");

// In the info traces you will see the different session IDs being set up, one for each TcpStream.
for i in 0..4 {
for i in 0..8 {
let client_cancel_inner_token = client_cancel_token.clone();
if client_cancel_token.is_cancelled() {
break;
}
let conn_id = i;
println!("Starting TCP connection {}", conn_id);
let local_tcp_addr = format!("127.0.0.1:{}", listen_port.clone());
tokio::spawn(async move {
// Now the client and server proxies are running we can create and pipe traffic to/from
Expand All @@ -81,7 +109,10 @@ async fn main() -> anyhow::Result<()> {

// Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead)
tokio::spawn(async move {
for i in 0..4 {
for i in 0..8 {
if client_cancel_inner_token.is_cancelled() {
break;
}
let mut rng = SmallRng::from_entropy();
let delay: f64 = rng.gen_range(2.5..5.0);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
Expand All @@ -96,12 +127,7 @@ async fn main() -> anyhow::Result<()> {
.write_all(&serialised)
.await
.expect("couldn't write to stream");
println!(
">> client sent {}: {} bytes on conn {}",
&i,
msg.message_bytes.len(),
&conn_id
);
println!(">> client sent msg {} on conn {}", &i, &conn_id);
}
Ok::<(), anyhow::Error>(())
});
Expand All @@ -113,17 +139,8 @@ async fn main() -> anyhow::Result<()> {
while let Some(Ok(bytes)) = framed_read.next().await {
match bincode::deserialize::<ExampleMessage>(&bytes) {
Ok(msg) => {
println!(
"<< client received {}: {} bytes on conn {}",
msg.message_id,
msg.message_bytes.len(),
msg.tcp_conn
);
reply_counter += 1;
println!(
"tcp connection {} replies received {}/4",
msg.tcp_conn, reply_counter
);
println!("<< conn {} received {}/8", msg.tcp_conn, reply_counter);
}
Err(e) => {
println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
Expand All @@ -138,15 +155,12 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
}

// Once timeout is passed, you can either wait for graceful shutdown or just hard stop it.
signal::ctrl_c().await?;
println!("CTRL+C received, shutting down");
Ok(())
}

// emulate a series of small messages followed by a closing larger one
fn gen_bytes_fixed(i: usize) -> Vec<u8> {
let amounts = [10, 15, 50, 1000];
let amounts = [10, 15, 50, 1000, 10, 15, 500, 2000];
let len = amounts[i];
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen::<u8>()).collect()
Expand Down
Loading
Loading