Skip to content

Commit

Permalink
make the faucet have a background thread that splits tx outs (#2063)
Browse files Browse the repository at this point in the history
* make the faucet have a background thread that splits tx outs

this enables it to support multiple faucet requests concurrently

* fix clippies, clean up some things, add tracking for queue depth

* bug fix and logging

* fix lints

* fix responses

* fix documentation and examples

* use async routes for compat with rocket 0.5

* Make more parameters configurable

* re-order worker thread checks

* skip serializing empty things

* add readme to cargo toml
  • Loading branch information
cbeck88 authored May 31, 2022
1 parent 24118d5 commit 3a1c83b
Show file tree
Hide file tree
Showing 7 changed files with 737 additions and 94 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion mobilecoind-dev-faucet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ name = "mc-mobilecoind-dev-faucet"
version = "1.3.0-pre0"
authors = ["MobileCoin"]
edition = "2021"
readme = "README.md"

[[bin]]
name = "mobilecoind-dev-faucet"
path = "src/bin/main.rs"

[dependencies]
mc-account-keys = { path = "../account-keys" }
mc-api = { path = "../api" }
mc-common = { path = "../common", features = ["loggers"] }
mc-mobilecoind-api = { path = "../mobilecoind/api" }
mc-transaction-types = { path = "../transaction/types" }
mc-transaction-core = { path = "../transaction/core" }
mc-util-grpc = { path = "../util/grpc" }
mc-util-keyfile = { path = "../util/keyfile" }

Expand All @@ -23,4 +25,6 @@ protobuf = "2.27.1"
rocket = { version = "0.5.0-rc.2", features = ["json"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_with = "1.13"
tokio = "1"
24 changes: 17 additions & 7 deletions mobilecoind-dev-faucet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ You may POST to `/`, attaching a json object as the HTTP body:

and the faucet will attempt to send a nominal amount of this token to this address,
or return errors if it cannot. The nominal amount is by default twenty times the minimum
fee for that token.
fee for that token. The response will contain a JSON object, `success` will be `true` if
it managed to submit a payment, and there will be mobilecoind "Receiver Tx receipt" for the
submitted transaction. If `success` is `false` then `err_str` will describe the problem.

You may GET to `/status`, and the faucet will respond with a json object:

Expand All @@ -29,6 +31,9 @@ You may GET to `/status`, and the faucet will respond with a json object:
balances: { <token_id (string)>:<u64 balance (string)> }
// The amounts the faucet pays per token id
faucet_amounts: { <token_id (string)>:<u64 balance (string)> }
// The current number of "queued" UTXOs. Each can be used to fill a concurrent request.
// If a queue is empty then it may take a few seconds for the faucet to refill the queue.
queue_depths: { <token_id (string)>:<u64 length (string)> }
// This address can be paid to replenish the faucet
b58_address: <string>,
}
Expand All @@ -38,13 +43,17 @@ You may GET to `/status`, and the faucet will respond with a json object:

The faucet should be started using a keyfile (which is json containing a mnemonic string or a root entropy).

Options are:
Required options are:

- `--keyfile` - path to the keyfile. this account holds the faucet funds
- `--keyfile` - path to the keyfile. This account holds the faucet funds.

Other options are:
- `--amount-factor` - An integer `X`. The amount we send when people hit the faucet is `minimum_fee * X`. Default is `X = 20`.
- `--listen-host` - hostname for webserver, default `127.0.0.1`
- `--listen-port` - port for webserver, default `9090`
- `--mobilecoind-uri` - URI for connecting to mobilecoind gRPC, default `insecure-mobilecoind://127.0.0.1:4444/`
- `--target-queue-depth` - The number of pre-split transactions the faucet attempts to maintain in its queue. Default is 20.
- `--worker-poll-period-ms` - A lower bound on how often the worker thread wakes up to check in with `mobilecoind`. Default is `100` milliseconds.

### Usage with cURL

Expand All @@ -54,15 +63,16 @@ Requesting payment:

```
$ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923"}' -X POST -H 'Content-type: application/json'
{"success":true}
```
{"success":true,"receiver_tx_receipt_list":[{"recipient":{"view_public_key":"86280244d51afed4217ee3dc6288650c27cacc6e4bfb558159f0f8caa38ae542","spend_public_key":"803958b71de5fa7a58d257a0411506e59f77eaff33ee7b7905ac4f9ef68e3c2a","fog_report_url":"","fog_authority_sig":"","fog_report_id":""},"tx_public_key":"880d56bc36411507131098dd404878fb083b6dd5b805c37f736dcfa94d31027d","tx_out_hash":"0fbe90326c255e08b3ee6cbdf626d244ac29bbdab8810163d09513fa1919664f","tombstone":56,"confirmation_number":"027c506b81ad5bd8142382c75f6148f6e5627ad45d2a09110ee9e4ff5a789398"}]}```

```
$ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923", "token_id": "1"}' -X POST -H 'Content-type: application/json'
{"success":true}
{"success":false,"err_str":"faucet is depleted"}
```

Getting status:

```
$ curl -s localhost:9090/status
{...}
{"b58_address":"5KBMnd8cs5zPsytGgZrjmQ8z9VJYThuh1B39pKzDERTfzm3sVGQxnZPC8JEWP69togpSPRz3e6pBsLzwnMjrXTbDqoRTQ8VF98sQu7LqjL5","faucet_amounts":{"2":"20480","1":"20480","0":"8000000000"},"balances":{"2":"0","1":"0","0":"12499999997600000000"},"queue_depths":{"1":"0","0":"26","2":"0"}}
```
170 changes: 88 additions & 82 deletions mobilecoind-dev-faucet/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,24 @@

use clap::Parser;
use grpcio::ChannelBuilder;
use mc_account_keys::AccountKey;
use mc_api::printable::PrintableWrapper;
use mc_common::logger::{create_app_logger, log, o, Logger};
use mc_mobilecoind_api::{
mobilecoind_api_grpc::MobilecoindApiClient, MobilecoindUri, SubmitTxResponse, TxStatus,
};
use mc_mobilecoind_dev_faucet::data_types::*;
use mc_transaction_types::TokenId;
use mc_mobilecoind_api::{mobilecoind_api_grpc::MobilecoindApiClient, MobilecoindUri};
use mc_mobilecoind_dev_faucet::{data_types::*, worker::Worker};
use mc_transaction_core::{ring_signature::KeyImage, TokenId};
use mc_util_grpc::ConnectionUriGrpcioChannel;
use mc_util_keyfile::read_keyfile;
use protobuf::RepeatedField;
use rocket::{get, post, routes, serde::json::Json};
use std::{
collections::HashMap,
path::PathBuf,
sync::{Arc, Mutex, MutexGuard},
time::Duration,
};
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};

/// Command line config, set with defaults that will work with
/// a standard mobilecoind instance
#[derive(Clone, Debug, Parser)]
#[clap(
name = "mobilecoind-dev-faucet",
about = "An HTTP faucet server, backed by mobilecoind"
about = "A stateless HTTP faucet server, backed by mobilecoind"
)]
pub struct Config {
/// Path to json-formatted key file, containing mnemonic or root entropy.
Expand All @@ -57,12 +51,24 @@ pub struct Config {
env = "MC_MOBILECOIND_URI"
)]
pub mobilecoind_uri: MobilecoindUri,

/// Target Queue Depth. When the queue for a token id is less than this in
/// depth, the worker attempts to make a split Tx to produce more TxOuts
/// for the queue.
#[clap(long, default_value = "20", env = "MC_TARGET_QUEUE_DEPTH")]
pub target_queue_depth: usize,

/// Worker poll period in milliseconds.
#[clap(long, default_value = "100", env = "MC_WORKER_POLL_PERIOD_MS")]
pub worker_poll_period_ms: u64,
}

/// Connection to the mobilecoind client
struct State {
/// The connection to mobilecoind
pub mobilecoind_api_client: MobilecoindApiClient,
/// The account key holding our funds
pub account_key: AccountKey,
/// The bytes of our monitor id, which holds the faucet's funds
pub monitor_id: Vec<u8>,
/// The public address of the faucet, which someone can use to replenish the
Expand All @@ -76,7 +82,9 @@ struct State {
pub grpc_env: Arc<grpcio::Environment>,
/// The submit tx response for our previous Tx if any. This lets us check
/// if we have an in-flight tx still.
pub inflight_tx_state: Mutex<Option<SubmitTxResponse>>,
pub worker: Worker,
/// Logger
pub logger: Logger,
}

impl State {
Expand Down Expand Up @@ -120,6 +128,11 @@ impl State {
resp.b58_code
};

let monitor_printable_wrapper = PrintableWrapper::b58_decode(monitor_b58_address.clone())
.expect("Could not decode b58 address");
assert!(monitor_printable_wrapper.has_public_address());
let monitor_public_address = monitor_printable_wrapper.get_public_address();

// Get the network minimum fees and compute faucet amounts
let faucet_amounts = {
let mut result = HashMap::<TokenId, u64>::default();
Expand All @@ -135,109 +148,89 @@ impl State {
result
};

let inflight_tx_state = Mutex::new(None);
// Start background worker, which splits txouts in advance
let worker = Worker::new(
mobilecoind_api_client.clone(),
monitor_id.clone(),
monitor_public_address.clone(),
faucet_amounts.clone(),
config.target_queue_depth,
Duration::from_millis(config.worker_poll_period_ms),
logger,
);

let logger = logger.new(o!("thread" => "http"));

Ok(State {
mobilecoind_api_client,
account_key,
monitor_id,
monitor_b58_address,
faucet_amounts,
grpc_env,
inflight_tx_state,
worker,
logger,
})
}

fn lock_and_check_inflight_tx_state(
&self,
) -> Result<MutexGuard<Option<SubmitTxResponse>>, String> {
let mut guard = self.inflight_tx_state.lock().expect("mutex poisoned");
if let Some(prev_tx) = guard.as_mut() {
let mut tries = 10;
loop {
let resp = self
.mobilecoind_api_client
.get_tx_status_as_sender(prev_tx)
.map_err(|err| format!("Failed getting network status: {}", err))?;
if resp.status == TxStatus::Unknown {
std::thread::sleep(Duration::from_millis(10));
tries -= 1;
if tries == 0 {
return Err("faucet is busy".to_string());
}
} else {
break;
}
}
}

*guard = None;

Ok(guard)
}
}

/// Request payment from the faucet
#[post("/", format = "json", data = "<req>")]
fn post(
async fn post(
state: &rocket::State<State>,
req: Json<JsonFaucetRequest>,
) -> Result<Json<JsonSendPaymentResponse>, String> {
) -> Result<Json<JsonSubmitTxResponse>, JsonSubmitTxResponse> {
let printable_wrapper = PrintableWrapper::b58_decode(req.b58_address.clone())
.map_err(|err| format!("Could not decode b58 address: {}", err))?;

let public_address = if printable_wrapper.has_public_address() {
printable_wrapper.get_public_address()
} else {
return Err(format!(
"b58 address '{}' is not a public address",
req.b58_address
));
return Err(format!("b58 address '{}' is not a public address", req.b58_address).into());
};

let token_id = TokenId::from(req.token_id.unwrap_or_default().as_ref());

let value = *state.faucet_amounts.get(&token_id).ok_or(format!(
"token_id: '{}' is not supported by the network",
token_id
))?;
let utxo_record = state.worker.get_utxo(token_id)?;
log::trace!(
state.logger,
"Got a UTXO: key_image = {:?}, value = {}",
KeyImage::try_from(utxo_record.utxo.get_key_image()).unwrap(),
utxo_record.utxo.value
);

let mut lock = state.lock_and_check_inflight_tx_state()?;
// Generate a Tx sending this specific TxOut, less fees
let mut req = mc_mobilecoind_api::GenerateTxFromTxOutListRequest::new();
req.set_account_key((&state.account_key).into());
req.set_input_list(RepeatedField::from_vec(vec![utxo_record.utxo]));
req.set_receiver(public_address.clone());
req.set_token_id(*token_id);

// Generate an outlay
let mut outlay = mc_mobilecoind_api::Outlay::new();
outlay.set_receiver(public_address.clone());
outlay.set_value(value);
let resp = state
.mobilecoind_api_client
.generate_tx_from_tx_out_list(&req)
.map_err(|err| format!("Failed to build Tx: {}", err))?;

// Send the payment request
let mut req = mc_mobilecoind_api::SendPaymentRequest::new();
req.set_sender_monitor_id(state.monitor_id.clone());
req.set_sender_subaddress(0);
req.set_token_id(*token_id);
req.set_outlay_list(RepeatedField::from_vec(vec![outlay]));
// Submit the tx proposal
let mut req = mc_mobilecoind_api::SubmitTxRequest::new();
req.set_tx_proposal(resp.get_tx_proposal().clone());

let resp = state
.mobilecoind_api_client
.send_payment(&req)
.map_err(|err| format!("Failed to send payment: {}", err))?;

// Convert from SendPaymentResponse to SubmitTxResponse,
// this is needed to check the status of an in-flight payment
let mut submit_tx_response = SubmitTxResponse::new();
submit_tx_response.set_sender_tx_receipt(resp.get_sender_tx_receipt().clone());
submit_tx_response
.set_receiver_tx_receipt_list(RepeatedField::from(resp.get_receiver_tx_receipt_list()));

// This lets us keep tabs on when this payment has resolved, so that we can
// avoid sending another payment until it does
*lock = Some(submit_tx_response);

// The receipt from the payment request can be used by the status check below
Ok(Json(JsonSendPaymentResponse::from(&resp)))
.submit_tx_async(&req)
.map_err(|err| format!("Failed to submit Tx: {}", err))?
.await
.map_err(|err| format!("Submit Tx ended in error: {}", err))?;

// Tell the worker that this utxo was submitted, so that it can track and
// recycle the utxo if this payment fails
let _ = utxo_record.sender.send(resp.clone());
Ok(Json(JsonSubmitTxResponse::from(&resp)))
}

/// Request status of the faucet
#[get("/status")]
fn status(state: &rocket::State<State>) -> Result<Json<JsonFaucetStatus>, String> {
async fn status(state: &rocket::State<State>) -> Result<Json<JsonFaucetStatus>, String> {
// Get up-to-date balances for all the tokens we are tracking
let mut balances: HashMap<TokenId, u64> = Default::default();
for (token_id, _) in state.faucet_amounts.iter() {
Expand All @@ -247,16 +240,25 @@ fn status(state: &rocket::State<State>) -> Result<Json<JsonFaucetStatus>, String

let resp = state
.mobilecoind_api_client
.get_balance(&req)
.get_balance_async(&req)
.map_err(|err| {
format!(
"Failed to check balance for token id '{}': {}",
token_id, err
)
})?
.await
.map_err(|err| {
format!(
"Balance check request for token id '{}' ended in error: {}",
token_id, err
)
})?;
balances.insert(*token_id, resp.balance);
}

let queue_depths = state.worker.get_queue_depths();

Ok(Json(JsonFaucetStatus {
b58_address: state.monitor_b58_address.clone(),
faucet_amounts: state
Expand All @@ -265,6 +267,10 @@ fn status(state: &rocket::State<State>) -> Result<Json<JsonFaucetStatus>, String
.map(convert_balance_pair)
.collect(),
balances: balances.iter().map(convert_balance_pair).collect(),
queue_depths: queue_depths
.into_iter()
.map(|(token_id, depth)| (JsonU64(*token_id), JsonU64(depth as u64)))
.collect(),
}))
}

Expand Down
Loading

0 comments on commit 3a1c83b

Please sign in to comment.