From 3a1c83b392b22ec1366ee759ed9eb9f2c5daf9a6 Mon Sep 17 00:00:00 2001 From: Chris Beck Date: Mon, 30 May 2022 18:34:17 -0600 Subject: [PATCH] make the faucet have a background thread that splits tx outs (#2063) * make the faucet have a background thread that splits tx outs this enables it to support multiple faucet requests concurrently * fix clippies, clean up some things, add tracking for queue depth * bug fix and logging * fix lints * fix responses * fix documentation and examples * use async routes for compat with rocket 0.5 * Make more parameters configurable * re-order worker thread checks * skip serializing empty things * add readme to cargo toml --- Cargo.lock | 5 +- mobilecoind-dev-faucet/Cargo.toml | 6 +- mobilecoind-dev-faucet/README.md | 24 +- mobilecoind-dev-faucet/src/bin/main.rs | 170 +++---- mobilecoind-dev-faucet/src/data_types.rs | 40 +- mobilecoind-dev-faucet/src/lib.rs | 1 + mobilecoind-dev-faucet/src/worker.rs | 585 +++++++++++++++++++++++ 7 files changed, 737 insertions(+), 94 deletions(-) create mode 100644 mobilecoind-dev-faucet/src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index 3f7ddc4a47..4583123527 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4632,17 +4632,20 @@ dependencies = [ "clap 3.1.18", "grpcio", "hex", + "mc-account-keys", "mc-api", "mc-common", "mc-mobilecoind-api", - "mc-transaction-types", + "mc-transaction-core", "mc-util-grpc", "mc-util-keyfile", "protobuf", "rocket", "serde", "serde_derive", + "serde_json", "serde_with", + "tokio", ] [[package]] diff --git a/mobilecoind-dev-faucet/Cargo.toml b/mobilecoind-dev-faucet/Cargo.toml index bbd494e502..3de0d0e4d6 100644 --- a/mobilecoind-dev-faucet/Cargo.toml +++ b/mobilecoind-dev-faucet/Cargo.toml @@ -3,16 +3,18 @@ name = "mc-mobilecoind-dev-faucet" version = "1.3.0-pre0" authors = ["MobileCoin"] edition = "2021" +readme = "README.md" [[bin]] name = "mobilecoind-dev-faucet" path = "src/bin/main.rs" [dependencies] +mc-account-keys = { path = "../account-keys" } mc-api = { path = "../api" } mc-common = { path = "../common", features = ["loggers"] } mc-mobilecoind-api = { path = "../mobilecoind/api" } -mc-transaction-types = { path = "../transaction/types" } +mc-transaction-core = { path = "../transaction/core" } mc-util-grpc = { path = "../util/grpc" } mc-util-keyfile = { path = "../util/keyfile" } @@ -23,4 +25,6 @@ protobuf = "2.27.1" rocket = { version = "0.5.0-rc.2", features = ["json"] } serde = "1.0" serde_derive = "1.0" +serde_json = "1.0" serde_with = "1.13" +tokio = "1" diff --git a/mobilecoind-dev-faucet/README.md b/mobilecoind-dev-faucet/README.md index 86ed69907a..b2e4c49607 100644 --- a/mobilecoind-dev-faucet/README.md +++ b/mobilecoind-dev-faucet/README.md @@ -19,7 +19,9 @@ You may POST to `/`, attaching a json object as the HTTP body: and the faucet will attempt to send a nominal amount of this token to this address, or return errors if it cannot. The nominal amount is by default twenty times the minimum -fee for that token. +fee for that token. The response will contain a JSON object, `success` will be `true` if +it managed to submit a payment, and there will be mobilecoind "Receiver Tx receipt" for the +submitted transaction. If `success` is `false` then `err_str` will describe the problem. You may GET to `/status`, and the faucet will respond with a json object: @@ -29,6 +31,9 @@ You may GET to `/status`, and the faucet will respond with a json object: balances: { : } // The amounts the faucet pays per token id faucet_amounts: { : } + // The current number of "queued" UTXOs. Each can be used to fill a concurrent request. + // If a queue is empty then it may take a few seconds for the faucet to refill the queue. + queue_depths: { : } // This address can be paid to replenish the faucet b58_address: , } @@ -38,13 +43,17 @@ You may GET to `/status`, and the faucet will respond with a json object: The faucet should be started using a keyfile (which is json containing a mnemonic string or a root entropy). -Options are: +Required options are: -- `--keyfile` - path to the keyfile. this account holds the faucet funds +- `--keyfile` - path to the keyfile. This account holds the faucet funds. + +Other options are: - `--amount-factor` - An integer `X`. The amount we send when people hit the faucet is `minimum_fee * X`. Default is `X = 20`. - `--listen-host` - hostname for webserver, default `127.0.0.1` - `--listen-port` - port for webserver, default `9090` - `--mobilecoind-uri` - URI for connecting to mobilecoind gRPC, default `insecure-mobilecoind://127.0.0.1:4444/` +- `--target-queue-depth` - The number of pre-split transactions the faucet attempts to maintain in its queue. Default is 20. +- `--worker-poll-period-ms` - A lower bound on how often the worker thread wakes up to check in with `mobilecoind`. Default is `100` milliseconds. ### Usage with cURL @@ -54,15 +63,16 @@ Requesting payment: ``` $ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923"}' -X POST -H 'Content-type: application/json' -{"success":true} -``` +{"success":true,"receiver_tx_receipt_list":[{"recipient":{"view_public_key":"86280244d51afed4217ee3dc6288650c27cacc6e4bfb558159f0f8caa38ae542","spend_public_key":"803958b71de5fa7a58d257a0411506e59f77eaff33ee7b7905ac4f9ef68e3c2a","fog_report_url":"","fog_authority_sig":"","fog_report_id":""},"tx_public_key":"880d56bc36411507131098dd404878fb083b6dd5b805c37f736dcfa94d31027d","tx_out_hash":"0fbe90326c255e08b3ee6cbdf626d244ac29bbdab8810163d09513fa1919664f","tombstone":56,"confirmation_number":"027c506b81ad5bd8142382c75f6148f6e5627ad45d2a09110ee9e4ff5a789398"}]}``` ``` $ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923", "token_id": "1"}' -X POST -H 'Content-type: application/json' -{"success":true} +{"success":false,"err_str":"faucet is depleted"} ``` +Getting status: + ``` $ curl -s localhost:9090/status -{...} +{"b58_address":"5KBMnd8cs5zPsytGgZrjmQ8z9VJYThuh1B39pKzDERTfzm3sVGQxnZPC8JEWP69togpSPRz3e6pBsLzwnMjrXTbDqoRTQ8VF98sQu7LqjL5","faucet_amounts":{"2":"20480","1":"20480","0":"8000000000"},"balances":{"2":"0","1":"0","0":"12499999997600000000"},"queue_depths":{"1":"0","0":"26","2":"0"}} ``` diff --git a/mobilecoind-dev-faucet/src/bin/main.rs b/mobilecoind-dev-faucet/src/bin/main.rs index 2edacadac7..6348c05c42 100644 --- a/mobilecoind-dev-faucet/src/bin/main.rs +++ b/mobilecoind-dev-faucet/src/bin/main.rs @@ -7,30 +7,24 @@ use clap::Parser; use grpcio::ChannelBuilder; +use mc_account_keys::AccountKey; use mc_api::printable::PrintableWrapper; use mc_common::logger::{create_app_logger, log, o, Logger}; -use mc_mobilecoind_api::{ - mobilecoind_api_grpc::MobilecoindApiClient, MobilecoindUri, SubmitTxResponse, TxStatus, -}; -use mc_mobilecoind_dev_faucet::data_types::*; -use mc_transaction_types::TokenId; +use mc_mobilecoind_api::{mobilecoind_api_grpc::MobilecoindApiClient, MobilecoindUri}; +use mc_mobilecoind_dev_faucet::{data_types::*, worker::Worker}; +use mc_transaction_core::{ring_signature::KeyImage, TokenId}; use mc_util_grpc::ConnectionUriGrpcioChannel; use mc_util_keyfile::read_keyfile; use protobuf::RepeatedField; use rocket::{get, post, routes, serde::json::Json}; -use std::{ - collections::HashMap, - path::PathBuf, - sync::{Arc, Mutex, MutexGuard}, - time::Duration, -}; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; /// Command line config, set with defaults that will work with /// a standard mobilecoind instance #[derive(Clone, Debug, Parser)] #[clap( name = "mobilecoind-dev-faucet", - about = "An HTTP faucet server, backed by mobilecoind" + about = "A stateless HTTP faucet server, backed by mobilecoind" )] pub struct Config { /// Path to json-formatted key file, containing mnemonic or root entropy. @@ -57,12 +51,24 @@ pub struct Config { env = "MC_MOBILECOIND_URI" )] pub mobilecoind_uri: MobilecoindUri, + + /// Target Queue Depth. When the queue for a token id is less than this in + /// depth, the worker attempts to make a split Tx to produce more TxOuts + /// for the queue. + #[clap(long, default_value = "20", env = "MC_TARGET_QUEUE_DEPTH")] + pub target_queue_depth: usize, + + /// Worker poll period in milliseconds. + #[clap(long, default_value = "100", env = "MC_WORKER_POLL_PERIOD_MS")] + pub worker_poll_period_ms: u64, } /// Connection to the mobilecoind client struct State { /// The connection to mobilecoind pub mobilecoind_api_client: MobilecoindApiClient, + /// The account key holding our funds + pub account_key: AccountKey, /// The bytes of our monitor id, which holds the faucet's funds pub monitor_id: Vec, /// The public address of the faucet, which someone can use to replenish the @@ -76,7 +82,9 @@ struct State { pub grpc_env: Arc, /// The submit tx response for our previous Tx if any. This lets us check /// if we have an in-flight tx still. - pub inflight_tx_state: Mutex>, + pub worker: Worker, + /// Logger + pub logger: Logger, } impl State { @@ -120,6 +128,11 @@ impl State { resp.b58_code }; + let monitor_printable_wrapper = PrintableWrapper::b58_decode(monitor_b58_address.clone()) + .expect("Could not decode b58 address"); + assert!(monitor_printable_wrapper.has_public_address()); + let monitor_public_address = monitor_printable_wrapper.get_public_address(); + // Get the network minimum fees and compute faucet amounts let faucet_amounts = { let mut result = HashMap::::default(); @@ -135,109 +148,89 @@ impl State { result }; - let inflight_tx_state = Mutex::new(None); + // Start background worker, which splits txouts in advance + let worker = Worker::new( + mobilecoind_api_client.clone(), + monitor_id.clone(), + monitor_public_address.clone(), + faucet_amounts.clone(), + config.target_queue_depth, + Duration::from_millis(config.worker_poll_period_ms), + logger, + ); + + let logger = logger.new(o!("thread" => "http")); Ok(State { mobilecoind_api_client, + account_key, monitor_id, monitor_b58_address, faucet_amounts, grpc_env, - inflight_tx_state, + worker, + logger, }) } - - fn lock_and_check_inflight_tx_state( - &self, - ) -> Result>, String> { - let mut guard = self.inflight_tx_state.lock().expect("mutex poisoned"); - if let Some(prev_tx) = guard.as_mut() { - let mut tries = 10; - loop { - let resp = self - .mobilecoind_api_client - .get_tx_status_as_sender(prev_tx) - .map_err(|err| format!("Failed getting network status: {}", err))?; - if resp.status == TxStatus::Unknown { - std::thread::sleep(Duration::from_millis(10)); - tries -= 1; - if tries == 0 { - return Err("faucet is busy".to_string()); - } - } else { - break; - } - } - } - - *guard = None; - - Ok(guard) - } } /// Request payment from the faucet #[post("/", format = "json", data = "")] -fn post( +async fn post( state: &rocket::State, req: Json, -) -> Result, String> { +) -> Result, JsonSubmitTxResponse> { let printable_wrapper = PrintableWrapper::b58_decode(req.b58_address.clone()) .map_err(|err| format!("Could not decode b58 address: {}", err))?; let public_address = if printable_wrapper.has_public_address() { printable_wrapper.get_public_address() } else { - return Err(format!( - "b58 address '{}' is not a public address", - req.b58_address - )); + return Err(format!("b58 address '{}' is not a public address", req.b58_address).into()); }; let token_id = TokenId::from(req.token_id.unwrap_or_default().as_ref()); - let value = *state.faucet_amounts.get(&token_id).ok_or(format!( - "token_id: '{}' is not supported by the network", - token_id - ))?; + let utxo_record = state.worker.get_utxo(token_id)?; + log::trace!( + state.logger, + "Got a UTXO: key_image = {:?}, value = {}", + KeyImage::try_from(utxo_record.utxo.get_key_image()).unwrap(), + utxo_record.utxo.value + ); - let mut lock = state.lock_and_check_inflight_tx_state()?; + // Generate a Tx sending this specific TxOut, less fees + let mut req = mc_mobilecoind_api::GenerateTxFromTxOutListRequest::new(); + req.set_account_key((&state.account_key).into()); + req.set_input_list(RepeatedField::from_vec(vec![utxo_record.utxo])); + req.set_receiver(public_address.clone()); + req.set_token_id(*token_id); - // Generate an outlay - let mut outlay = mc_mobilecoind_api::Outlay::new(); - outlay.set_receiver(public_address.clone()); - outlay.set_value(value); + let resp = state + .mobilecoind_api_client + .generate_tx_from_tx_out_list(&req) + .map_err(|err| format!("Failed to build Tx: {}", err))?; - // Send the payment request - let mut req = mc_mobilecoind_api::SendPaymentRequest::new(); - req.set_sender_monitor_id(state.monitor_id.clone()); - req.set_sender_subaddress(0); - req.set_token_id(*token_id); - req.set_outlay_list(RepeatedField::from_vec(vec![outlay])); + // Submit the tx proposal + let mut req = mc_mobilecoind_api::SubmitTxRequest::new(); + req.set_tx_proposal(resp.get_tx_proposal().clone()); let resp = state .mobilecoind_api_client - .send_payment(&req) - .map_err(|err| format!("Failed to send payment: {}", err))?; - - // Convert from SendPaymentResponse to SubmitTxResponse, - // this is needed to check the status of an in-flight payment - let mut submit_tx_response = SubmitTxResponse::new(); - submit_tx_response.set_sender_tx_receipt(resp.get_sender_tx_receipt().clone()); - submit_tx_response - .set_receiver_tx_receipt_list(RepeatedField::from(resp.get_receiver_tx_receipt_list())); - - // This lets us keep tabs on when this payment has resolved, so that we can - // avoid sending another payment until it does - *lock = Some(submit_tx_response); - - // The receipt from the payment request can be used by the status check below - Ok(Json(JsonSendPaymentResponse::from(&resp))) + .submit_tx_async(&req) + .map_err(|err| format!("Failed to submit Tx: {}", err))? + .await + .map_err(|err| format!("Submit Tx ended in error: {}", err))?; + + // Tell the worker that this utxo was submitted, so that it can track and + // recycle the utxo if this payment fails + let _ = utxo_record.sender.send(resp.clone()); + Ok(Json(JsonSubmitTxResponse::from(&resp))) } /// Request status of the faucet #[get("/status")] -fn status(state: &rocket::State) -> Result, String> { +async fn status(state: &rocket::State) -> Result, String> { // Get up-to-date balances for all the tokens we are tracking let mut balances: HashMap = Default::default(); for (token_id, _) in state.faucet_amounts.iter() { @@ -247,16 +240,25 @@ fn status(state: &rocket::State) -> Result, String let resp = state .mobilecoind_api_client - .get_balance(&req) + .get_balance_async(&req) .map_err(|err| { format!( "Failed to check balance for token id '{}': {}", token_id, err ) + })? + .await + .map_err(|err| { + format!( + "Balance check request for token id '{}' ended in error: {}", + token_id, err + ) })?; balances.insert(*token_id, resp.balance); } + let queue_depths = state.worker.get_queue_depths(); + Ok(Json(JsonFaucetStatus { b58_address: state.monitor_b58_address.clone(), faucet_amounts: state @@ -265,6 +267,10 @@ fn status(state: &rocket::State) -> Result, String .map(convert_balance_pair) .collect(), balances: balances.iter().map(convert_balance_pair).collect(), + queue_depths: queue_depths + .into_iter() + .map(|(token_id, depth)| (JsonU64(*token_id), JsonU64(depth as u64))) + .collect(), })) } diff --git a/mobilecoind-dev-faucet/src/data_types.rs b/mobilecoind-dev-faucet/src/data_types.rs index b69fe1587a..08afea5104 100644 --- a/mobilecoind-dev-faucet/src/data_types.rs +++ b/mobilecoind-dev-faucet/src/data_types.rs @@ -3,6 +3,11 @@ //! Serializeable data types that wrap the mobilecoind API. use mc_api::external::PublicAddress; +use rocket::{ + http::Status, + response::{self, content, Responder}, + Request, +}; use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; @@ -52,6 +57,7 @@ pub struct JsonFaucetStatus { pub b58_address: String, pub faucet_amounts: HashMap, pub balances: HashMap, + pub queue_depths: HashMap, } #[derive(Deserialize, Serialize, Default, Debug)] @@ -106,13 +112,19 @@ impl From<&PublicAddress> for JsonPublicAddress { } #[derive(Deserialize, Serialize, Default, Debug)] -pub struct JsonSendPaymentResponse { +pub struct JsonSubmitTxResponse { + pub success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub err_str: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] pub receiver_tx_receipt_list: Vec, } -impl From<&mc_mobilecoind_api::SendPaymentResponse> for JsonSendPaymentResponse { - fn from(src: &mc_mobilecoind_api::SendPaymentResponse) -> Self { +impl From<&mc_mobilecoind_api::SubmitTxResponse> for JsonSubmitTxResponse { + fn from(src: &mc_mobilecoind_api::SubmitTxResponse) -> Self { Self { + success: true, + err_str: None, receiver_tx_receipt_list: src .get_receiver_tx_receipt_list() .iter() @@ -121,3 +133,25 @@ impl From<&mc_mobilecoind_api::SendPaymentResponse> for JsonSendPaymentResponse } } } + +impl From for JsonSubmitTxResponse { + fn from(src: String) -> Self { + Self { + success: false, + err_str: Some(src), + receiver_tx_receipt_list: Default::default(), + } + } +} + +// Implement rocket::Responder for JsonSubmitTxResponse +// If we don't do this then it is very difficult to respond to errors with +// a Json object, because we cannot implement conversions on the +// rocket::Json<...> object. +impl<'r> Responder<'r, 'static> for JsonSubmitTxResponse { + fn respond_to(self, req: &'r Request) -> response::Result<'static> { + let string = serde_json::to_string(&self).map_err(|_e| Status::InternalServerError)?; + + content::RawJson(string).respond_to(req) + } +} diff --git a/mobilecoind-dev-faucet/src/lib.rs b/mobilecoind-dev-faucet/src/lib.rs index 530f8bd356..042a6fa78f 100644 --- a/mobilecoind-dev-faucet/src/lib.rs +++ b/mobilecoind-dev-faucet/src/lib.rs @@ -3,3 +3,4 @@ //! JSON wrapper for the mobilecoind API. pub mod data_types; +pub mod worker; diff --git a/mobilecoind-dev-faucet/src/worker.rs b/mobilecoind-dev-faucet/src/worker.rs new file mode 100644 index 0000000000..3cc03a37da --- /dev/null +++ b/mobilecoind-dev-faucet/src/worker.rs @@ -0,0 +1,585 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +//! This module creates a faucet "worker". It consists of a worker thread, +//! and a handle to the worker thread, which can be used to asynchronously get +//! the output of the worker. +//! +//! The worker ensures that the faucet is ready for operation by making self- +//! payments that split TxOut's if the number of pre-split TxOut's falls below a +//! threshold. TxOut's that are ready to be used get added to a tokio queue. +//! +//! Other threads may dequeue UTXOs from such a queue and then use them. +//! The queue also carries a one-shot channel which can give the worker a +//! SubmitTxResponse, or signal that an error occurred. The worker then follows +//! up on what happens with the UTXO, and eventually requeues it if the +//! transaction doesn't resolve successfully. +//! +//! The worker uses its own thread, but uses async-friendly tokio primitives. +//! The worker does not require to be launched from the context of a tokio +//! runtime. + +use mc_common::logger::{log, o, Logger}; +use mc_mobilecoind_api::{ + external::PublicAddress, mobilecoind_api_grpc::MobilecoindApiClient, SubmitTxResponse, + TxStatus, UnspentTxOut, +}; +use mc_transaction_core::{constants::MAX_OUTPUTS, ring_signature::KeyImage, TokenId}; +use protobuf::RepeatedField; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::TryRecvError}, +}; + +/// A record the worker hands to faucet threads about a UTXO they can use. +/// It expects to be notified if the UTXO is successfully submitted. +/// If the one-shot sender is dropped, the worker assumes that there was an +/// error, and the faucet dropped the UTXO, so that this UTXO can be used again +/// potentially. +pub struct UtxoRecord { + pub utxo: UnspentTxOut, + pub sender: oneshot::Sender, +} + +/// A tracker the worker keeps for UTXO records it hands to faucet threads. +pub struct UtxoTracker { + pub utxo: UnspentTxOut, + receiver: oneshot::Receiver, + received: Option, +} + +impl UtxoTracker { + /// Make a new tracker and associated record for a given utxo + pub fn new(utxo: UnspentTxOut) -> (Self, UtxoRecord) { + let (sender, receiver) = oneshot::channel(); + + let record = UtxoRecord { + utxo: utxo.clone(), + sender, + }; + + let tracker = Self { + utxo, + receiver, + received: None, + }; + + (tracker, record) + } + + // If available, get either a SubmitTxResponse, or an error indicating the + // channel was closed, which means we had an error and couldn't submit the + // Tx that spent this. + pub fn poll(&mut self) -> Option> { + if let Some(resp) = self.received.as_ref() { + Some(Ok(resp.clone())) + } else { + match self.receiver.try_recv() { + Ok(resp) => { + self.received = Some(resp.clone()); + Some(Ok(resp)) + } + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => Some(Err(TryRecvError::Closed)), + } + } + } +} + +/// TokenStateReceiver holds the queue of Utxo records for a particular token +/// id, as well as other shared flags that indicate if we are out of funds etc. +pub struct TokenStateReceiver { + receiver: Mutex>, + funds_depleted_flag: Arc, + queue_depth: Arc, +} + +impl TokenStateReceiver { + /// Get a utxo from the queue, or a string explaining why we can't + pub fn get_utxo(&self) -> Result { + let mut receiver = self.receiver.lock().expect("mutex poisoned"); + loop { + match receiver.try_recv() { + Ok(utxo_record) => { + self.queue_depth.fetch_sub(1, Ordering::SeqCst); + // Check if the one-shot sender has already been closed. + // If it has, this means the worker dropped the tracker. + // This only happens if the worker decided this txo isn't unspent anymore + // when it polled mobilecoind. + // (We did not send the worker status yet.) So we should skip it and pull + // the next thing from the queue, since obviously a race has happened. + // (The worker will not spend utxos that it puts in the queue, + // but possibly another wallet with the same account key did.) + if !utxo_record.sender.is_closed() { + return Ok(utxo_record); + } + // Intentional fall-through to loop + } + Err(mpsc::error::TryRecvError::Empty) => { + return if self.funds_depleted_flag.load(Ordering::SeqCst) { + Err("faucet is depleted".to_string()) + } else { + Err("faucet is busy".to_string()) + }; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return Err("internal error".to_string()); + } + } + } + } + + /// Get the current depth of the queue + pub fn get_queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::SeqCst) + } +} + +/// The worker is responsible for pre-splitting the faucet's balance so that it +/// can handle multiple faucet requests concurrently. +/// +/// It periodically calls `get_unspent_tx_out_list` for each token of interest. +/// If there are fewer than THRESHOLD TxOuts whose value is exactly "faucet +/// amount", then it attempts to make a self-payment which creates THRESHOLD +/// more pre-split TxOuts. +/// +/// To ensure concurrent faucet requests don't try to use the same unspent +/// TxOut's as eachother, the worker puts the unspent TxOut's in a queue as they +/// appear. +/// +/// Threads that have access to the worker handle can quickly try to pull an +/// unspent TxOut from the queue and send it, or find that the queue is empty +/// and give up. +pub struct Worker { + /// The reciever queues, and flags indicating if we are out of funds, for + /// each token id + receivers: HashMap, + + /// The worker thread handle + join_handle: Option>, + + /// A flag which can be used to request the worker thread to join + /// This is done by dropping the worker handle + stop_requested: Arc, +} + +impl Worker { + /// Make a new worker object given mobilecoind connection and config info, + /// and starts the worker thread. + /// + /// Arguments: + /// * client: connection to mobilecoind + /// * monitor_id: The monitor id for the account we are using + /// * public_address: The public address of our monitor id, used for + /// self-payments + /// * faucet_amounts: The target value for UTXOs of each token we are + /// interested in + /// * target_queue_depth: The target depth of the queue for each token id If + /// a queue falls below this number the worker attempts a split Tx. + /// * worker_poll_period: A lower bound on how often the worker should poll + /// * logger + /// + /// Returns the worker handle. + pub fn new( + client: MobilecoindApiClient, + monitor_id: Vec, + public_address: PublicAddress, + faucet_amounts: HashMap, + target_queue_depth: usize, + worker_poll_period: Duration, + logger: &Logger, + ) -> Worker { + let mut worker_token_states = Vec::::default(); + let mut receivers = HashMap::::default(); + + for (token_id, value) in faucet_amounts.iter() { + let (state, receiver) = WorkerTokenState::new(*token_id, *value); + worker_token_states.push(state); + receivers.insert(*token_id, receiver); + } + + let stop_requested = Arc::new(AtomicBool::default()); + let thread_stop_requested = stop_requested.clone(); + + let logger = logger.new(o!("thread" => "worker")); + + let join_handle = Some(std::thread::spawn(move || { + // First wait for account to sync + // Get the "initial" ledger block count + let block_count = loop { + match client.get_ledger_info(&Default::default()) { + Ok(resp) => break resp.block_count, + Err(err) => { + log::error!(logger, "Could not get ledger info: {:?}", err); + } + } + std::thread::sleep(worker_poll_period); + }; + log::info!(logger, "Ledger is at block_count = {}", block_count); + + // Now wait for monitor state to at least pass this point + loop { + let mut req = mc_mobilecoind_api::GetMonitorStatusRequest::new(); + req.set_monitor_id(monitor_id.clone()); + match client.get_monitor_status(&req) { + Ok(resp) => { + let monitor_block_count = resp.get_status().next_block; + if monitor_block_count >= block_count { + log::info!( + logger, + "Monitor has synced to block count {}", + monitor_block_count + ); + break; + } + } + Err(err) => { + log::error!(logger, "Could not get monitor status: {:?}", err); + } + } + std::thread::sleep(worker_poll_period); + } + + // Poll all token ids looking for activity, then sleep for a bit + loop { + if thread_stop_requested.load(Ordering::SeqCst) { + log::info!(logger, "Worker: stop was requested"); + break; + } + for state in worker_token_states.iter_mut() { + if let Err(err_str) = state.poll( + &client, + &monitor_id, + &public_address, + target_queue_depth, + &logger, + ) { + log::error!(logger, "{}", err_str); + } + } + log::trace!(logger, "Worker sleeping"); + std::thread::sleep(worker_poll_period); + } + })); + + Worker { + receivers, + join_handle, + stop_requested, + } + } + + /// Get a utxo with the target value, for a given token id. + /// This pulls a utxo from the queue, and the recipient has responsbility + /// to either successfully send the TxOut and use its oneshot::Sender to + /// report the result from consensus, or, to drop the oneshot::Sender, + /// reporting an error using the TxOut. + pub fn get_utxo(&self, token_id: TokenId) -> Result { + if let Some(receiver) = self.receivers.get(&token_id) { + receiver.get_utxo() + } else { + Err(format!("Unknown token id: {}", token_id)) + } + } + + /// Get the depths of all of the queues + pub fn get_queue_depths(&self) -> HashMap { + self.receivers + .iter() + .map(|(token_id, receiver)| (*token_id, receiver.get_queue_depth())) + .collect() + } +} + +impl Drop for Worker { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + self.stop_requested.store(true, Ordering::SeqCst); + handle.join().expect("failed to join worker thread"); + } + } +} + +struct WorkerTokenState { + // The token id being tracked + token_id: TokenId, + // The target value of UTXOS for this token id + target_value: u64, + // The most recently known set of UTXOS for this token id + // When we get a new UTXO from mobilecoind, we track it using this cache. + // The tracker contains a one-shot channel that the other side can use to + // let us know what happens with this UTXO. + // UTXOs are added here at the same time they are queued. As long as a UTXO + // is in this cache, we won't requeue it, to avoid two threads spending + // the same UTXO concurrently. + known_utxos: HashMap, + // The queue of UTXOs with the target value + sender: UnboundedSender, + // If we submit a split transaction, the response we can use to track it + in_flight_split_tx_state: Option, + // A shared flag we use to signal if have insufficient funds for this token id + funds_depleted: Arc, + // A shared counter used to indicate roughly how many items are in the queue + queue_depth: Arc, +} + +impl WorkerTokenState { + // Create a new worker token state, with a given token id and target value. + // Returns the channels to be passed to other thread, including the receiver + // for new UtxoRecords, and the "funds depleted" flag + fn new(token_id: TokenId, target_value: u64) -> (WorkerTokenState, TokenStateReceiver) { + let (sender, receiver) = mpsc::unbounded_channel::(); + + let funds_depleted_flag = Arc::new(AtomicBool::default()); + let funds_depleted = funds_depleted_flag.clone(); + + let queue_depth = Arc::new(AtomicUsize::default()); + let queue_depth_counter = queue_depth.clone(); + + ( + Self { + token_id, + target_value, + known_utxos: Default::default(), + sender, + in_flight_split_tx_state: None, + funds_depleted, + queue_depth, + }, + TokenStateReceiver { + receiver: Mutex::new(receiver), + funds_depleted_flag, + queue_depth: queue_depth_counter, + }, + ) + } + + // Poll a given token for activity. + // + // (1) Get the UTXO list for this token, checks it for new UTXOs, and + // sends things to the channel if we do find new things. + // (2) Check up on old things, checking if they were eventually submitted + // or not, and if those submissions were successful. If their submissions + // resolve, it purges them from its cache so that they can be found again + // and resubmitted if necessary. + // (3) Check if we have enough pre-split Txos, and if we don't, check + // if we already have an in-flight Tx to try to fix this. If not then it builds + // and submits a new splitting Tx. + // + // Returns a string which should be logged if e.g. we encounter an RPC error + fn poll( + &mut self, + client: &MobilecoindApiClient, + monitor_id: &[u8], + public_address: &PublicAddress, + target_queue_depth: usize, + logger: &Logger, + ) -> Result<(), String> { + // First, for each known utxo already queued, check if it was sent in a + // transaction and if so what the status is + self.known_utxos.retain(|_key_image, tracker| { + if let Some(status) = tracker.poll() { + // If poll returned Some, then we either got a SubmitTxResponse or an error + if let Ok(resp) = status { + // It was successfully submitted to the network, let's ask mobilecoind what's + // happened. If it's still in-flight we should retain it, if it has resolved, + // we should drop it from our records. + is_tx_still_in_flight(client, &resp, "Faucet", logger) + } else { + // The oneshot receiver resolved in an error, this means, the other side dropped + // this channel, without reporting a SubmitTxResponse. This + // means there was an error building or submitting the Tx, + // and the other side has now dropped this UnspentTxOut. We should requeue it + // so that it can be eventually be spent, and for now we should just purge it. + false + } + } else { + // Still in the queue as far as we know + true + } + }); + + // Now, get a fresh unspent tx out list associated to this token + let mut resp = { + let mut req = mc_mobilecoind_api::GetUnspentTxOutListRequest::new(); + req.token_id = *self.token_id; + req.monitor_id = monitor_id.to_vec(); + + client.get_unspent_tx_out_list(&req).map_err(|err| { + format!( + "Could not get unspent txout list for token id = {}: {}", + self.token_id, err + ) + })? + }; + + // Now, check all the reported utxos. + // If it is new and has the target value, then queue it + let mut output_list_key_images = HashSet::::default(); + + for utxo in resp.output_list.iter() { + // Sanity check the token id + if utxo.token_id != self.token_id { + continue; + } + + if utxo.value != self.target_value { + continue; + } + + let key_image: KeyImage = utxo + .get_key_image() + .try_into() + .map_err(|err| format!("invalid key image: {}", err))?; + if let Entry::Vacant(e) = self.known_utxos.entry(key_image) { + // We found a utxo not in the cache, let's queue and add to cache + log::trace!( + logger, + "Queueing a utxo: key_image = {:?}, value = {}", + key_image, + utxo.value + ); + let (tracker, record) = UtxoTracker::new(utxo.clone()); + // Add to queue depth before push, because we subtract after pop + self.queue_depth.fetch_add(1, Ordering::SeqCst); + let _ = self.sender.send(record); + e.insert(tracker); + } + + // Add the key image of this utxo to a set, this helps us purge the cache + output_list_key_images.insert(key_image); + } + + // Remove any known utxos that no longer exist in the response list + // That is, remove any utxo whose key image wasn't added to + // output_list_key_images before. (This also drops the one-shot receiver, + // and so can tell the other side not to bother sending this utxo if they get + // it from the queue.) + self.known_utxos + .retain(|key_image, _tracker| output_list_key_images.contains(key_image)); + + // Check the queue depth, and decide if we should make a split tx + if self.queue_depth.load(Ordering::SeqCst) < target_queue_depth { + // Check if we already tried to fix this in the last iteration + if let Some(prev_tx) = self.in_flight_split_tx_state.as_ref() { + if is_tx_still_in_flight(client, prev_tx, "Split", logger) { + // There is already a fix in-flight, let's do nothing until it lands. + return Ok(()); + } + } + log::trace!(logger, "Attempting to split on token id {}", self.token_id); + // At this point, the previous in-flight tx resolved somehow and if it was an + // error we logged it + self.in_flight_split_tx_state = None; + + // We will now attempt to build and submit a split Tx that prooduces TxOuts of + // target value from those that aren't + let non_target_value_utxos: Vec<_> = resp + .take_output_list() + .into_iter() + .filter(|utxo| utxo.token_id == self.token_id && utxo.value != self.target_value) + .collect(); + + // First make sure we have enough funds for what we want to do, so we don't spam + // errors when we are depleted, and so that faucet users can know + // that retries won't help. + if non_target_value_utxos + .iter() + .map(|utxo| utxo.value) + .sum::() + < self.target_value * (MAX_OUTPUTS - 1) + { + self.funds_depleted.store(true, Ordering::SeqCst); + log::trace!(logger, "Funds depleted on {}", self.token_id); + return Ok(()); + } else { + let prev_value = self.funds_depleted.swap(false, Ordering::SeqCst); + if prev_value { + log::info!(logger, "Funds no longer depleted on {}", self.token_id); + } + } + + // Generate an outlay + // We will repeat this outlay MAX_OUTPUTS - 1 times + // (-1 is for a change output) + let mut outlay = mc_mobilecoind_api::Outlay::new(); + outlay.set_receiver(public_address.clone()); + outlay.set_value(self.target_value); + + // Generate a Tx + let mut req = mc_mobilecoind_api::GenerateTxRequest::new(); + req.set_sender_monitor_id(monitor_id.to_vec()); + req.set_token_id(*self.token_id); + req.set_input_list(RepeatedField::from_vec(non_target_value_utxos)); + req.set_outlay_list(RepeatedField::from_vec(vec![ + outlay; + MAX_OUTPUTS as usize - 1 + ])); + + let mut resp = client + .generate_tx(&req) + .map_err(|err| format!("Failed to generate split tx: {}", err))?; + + // Submit the Tx + let mut req = mc_mobilecoind_api::SubmitTxRequest::new(); + req.set_tx_proposal(resp.take_tx_proposal()); + let submit_tx_response = client + .submit_tx(&req) + .map_err(|err| format!("Failed to submit split tx: {}", err))?; + + // This lets us keep tabs on when this split payment has resolved, so that we + // can avoid sending another payment until it does + self.in_flight_split_tx_state = Some(submit_tx_response); + } + + Ok(()) + } +} + +/// Check if a given tx is still in-flight. +/// Logs an error if something strange happened +/// +/// Arguments: +/// * client: connection to mobilecoind +/// * tx: the submit tx response +/// * context: The context of this tx, used for logging +/// * logger +/// +/// Returns true if the tx is still (potentially) in-flight, false if it has +/// resolved now (either successfully or in an error) +fn is_tx_still_in_flight( + client: &MobilecoindApiClient, + tx: &SubmitTxResponse, + context: &str, + logger: &Logger, +) -> bool { + match client.get_tx_status_as_sender(tx) { + Ok(resp) => { + if resp.status == TxStatus::Unknown { + return true; + } + if resp.status != TxStatus::Verified { + log::error!( + logger, + "{} Tx ended with status: {:?}", + context, + resp.status + ); + } + // Whether successful or an error, the Tx has resolved now + false + } + Err(err) => { + log::error!(logger, "Failed getting {} Tx status: {}", context, err); + // We still don't know the status, so it may still be in-flight + true + } + } +}