diff --git a/Cargo.lock b/Cargo.lock index dd73d8b787..2f7da7cdca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -984,41 +984,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "darling" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" -dependencies = [ - "darling_core", - "darling_macro", -] - -[[package]] -name = "darling_core" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim 0.10.0", - "syn", -] - -[[package]] -name = "darling_macro" -version = "0.13.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" -dependencies = [ - "darling_core", - "quote", - "syn", -] - [[package]] name = "debugid" version = "0.8.0" @@ -1856,12 +1821,6 @@ dependencies = [ "tokio-rustls", ] -[[package]] -name = "ident_case" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" - [[package]] name = "idna" version = "0.2.0" @@ -4639,6 +4598,28 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "mc-mobilecoind-dev-faucet" +version = "1.3.0-pre0" +dependencies = [ + "clap 3.1.18", + "grpcio", + "hex", + "mc-account-keys", + "mc-api", + "mc-common", + "mc-mobilecoind-api", + "mc-transaction-core", + "mc-util-grpc", + "mc-util-keyfile", + "mc-util-serial", + "rocket", + "serde", + "serde_derive", + "serde_json", + "tokio", +] + [[package]] name = "mc-mobilecoind-json" version = "1.3.0-pre0" @@ -4655,12 +4636,12 @@ dependencies = [ "mc-transaction-core-test-utils", "mc-util-from-random", "mc-util-grpc", + "mc-util-serial", "protobuf", "rand 0.8.5", "rocket", "serde", "serde_derive", - "serde_with", ] [[package]] @@ -5335,6 +5316,7 @@ dependencies = [ "prost", "serde", "serde_cbor", + "serde_with", ] [[package]] @@ -7196,25 +7178,11 @@ dependencies = [ [[package]] name = "serde_with" -version = "1.13.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b827f2113224f3f19a665136f006709194bdfdcb1fdc1e4b2b5cbac8e0cced54" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" dependencies = [ - "rustversion", "serde", - "serde_with_macros", -] - -[[package]] -name = "serde_with_macros" -version = "1.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" -dependencies = [ - "darling", - "proc-macro2", - "quote", - "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1d356a5e63..6654ba3ce6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,7 @@ members = [ "mint-auditor", "mint-auditor/api", "mobilecoind", + "mobilecoind-dev-faucet", "mobilecoind-json", "mobilecoind/api", "peers", diff --git a/mobilecoind-dev-faucet/Cargo.toml b/mobilecoind-dev-faucet/Cargo.toml new file mode 100644 index 0000000000..605e59bb15 --- /dev/null +++ b/mobilecoind-dev-faucet/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "mc-mobilecoind-dev-faucet" +version = "1.3.0-pre0" +authors = ["MobileCoin"] +edition = "2021" +readme = "README.md" + +[[bin]] +name = "mobilecoind-dev-faucet" +path = "src/bin/main.rs" + +[dependencies] +mc-account-keys = { path = "../account-keys" } +mc-api = { path = "../api" } +mc-common = { path = "../common", features = ["loggers"] } +mc-mobilecoind-api = { path = "../mobilecoind/api" } +mc-transaction-core = { path = "../transaction/core" } +mc-util-grpc = { path = "../util/grpc" } +mc-util-keyfile = { path = "../util/keyfile" } +mc-util-serial = { path = "../util/serial", features = ["std"] } + +clap = { version = "3.1", features = ["derive", "env"] } +grpcio = "0.10.2" +hex = "0.4" +rocket = { version = "0.5.0-rc.2", features = ["json"] } +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +tokio = "1" diff --git a/mobilecoind-dev-faucet/README.md b/mobilecoind-dev-faucet/README.md new file mode 100644 index 0000000000..97b033e626 --- /dev/null +++ b/mobilecoind-dev-faucet/README.md @@ -0,0 +1,81 @@ +## mobilecoind-dev-faucet + +This is a standalone http server which provides faucet functionality. +* Backed by [mobilecoind](../mobilecoind) -- similar to [mobilecoind-json](../mobilecoind-json), it relays requests to a separate mobilecoind instance, and is itself stateless +* No captcha or rate limiting. This is appropriate for developers running automated tests in a dev cluster. +* Any token id can be requested for testing +* TODO: HTTP Authorization headers may be added in the future + +### Routes + +You may POST to `/`, attaching a json object as the HTTP body: + +``` +{ + b58_address: , + token_id: +} +``` + +Any tokenid can be requested and the faucet will attempt to send a nominal amount of +that token to the address specified, or return errors if it cannot. The nominal amount is +by default twenty times the minimum fee for that token. The response will contain a +JSON object, `success` will be `true` if it managed to submit a payment, and there will +be mobilecoind "Receiver Tx receipt" for the submitted transaction. If `success` is `false` +then `err_str` will describe the problem. + +GET requests to `/status`, will respond with a json object with the +following information: + +``` +{ + // The balances of the faucet + balances: { : } + // The amounts the faucet pays per token id + faucet_amounts: { : } + // The current number of "queued" UTXOs. Each can be used to fill a concurrent request. + // If a queue is empty then it may take a few seconds for the faucet to refill the queue. + queue_depths: { : } + // This address can be paid to replenish the faucet + b58_address: , +} +``` + +### Launching + +The faucet should be started using a keyfile, which is a json formatted file containing a +mnemonic string or a root entropy for a MobileCoin account. + +Required options are: + +- `--keyfile` - path to the keyfile with the account mnemonic or entropy. This account holds the faucet funds. + +Other options are: +- `--amount-factor` - An integer `X`. The amount we send when people hit the faucet is `minimum_fee * X`. Default is `X = 20`. +- `--listen-host` - hostname for webserver, default `127.0.0.1` +- `--listen-port` - port for webserver, default `9090` +- `--mobilecoind-uri` - URI for connecting to mobilecoind gRPC, default `insecure-mobilecoind://127.0.0.1:4444/` +- `--target-queue-depth` - The number of pre-split transactions the faucet attempts to maintain in its queue. Default is 20. +- `--worker-poll-period-ms` - A lower bound on how often the worker thread wakes up to check in with `mobilecoind`. Default is `100` milliseconds. + +### Usage with cURL + +Here is some example usage: + +Requesting payment: + +``` +$ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923"}' -X POST -H 'Content-type: application/json' +{"success":true,"receiver_tx_receipt_list":[{"recipient":{"view_public_key":"86280244d51afed4217ee3dc6288650c27cacc6e4bfb558159f0f8caa38ae542","spend_public_key":"803958b71de5fa7a58d257a0411506e59f77eaff33ee7b7905ac4f9ef68e3c2a","fog_report_url":"","fog_authority_sig":"","fog_report_id":""},"tx_public_key":"880d56bc36411507131098dd404878fb083b6dd5b805c37f736dcfa94d31027d","tx_out_hash":"0fbe90326c255e08b3ee6cbdf626d244ac29bbdab8810163d09513fa1919664f","tombstone":56,"confirmation_number":"027c506b81ad5bd8142382c75f6148f6e5627ad45d2a09110ee9e4ff5a789398"}]}``` + +``` +$ curl -s localhost:9090/ -d '{"b58_address": "c7f04fcd40d093ca6578b13d790df0790c96e94a77815e5052993af1b9d12923", "token_id": "1"}' -X POST -H 'Content-type: application/json' +{"success":false,"err_str":"faucet is depleted"} +``` + +Getting status: + +``` +$ curl -s localhost:9090/status +{"b58_address":"5KBMnd8cs5zPsytGgZrjmQ8z9VJYThuh1B39pKzDERTfzm3sVGQxnZPC8JEWP69togpSPRz3e6pBsLzwnMjrXTbDqoRTQ8VF98sQu7LqjL5","faucet_payout_amounts":{"2":"20480","1":"20480","0":"8000000000"},"balances":{"2":"0","1":"0","0":"12499999997600000000"},"queue_depths":{"1":"0","0":"26","2":"0"}} +``` diff --git a/mobilecoind-dev-faucet/src/bin/main.rs b/mobilecoind-dev-faucet/src/bin/main.rs new file mode 100644 index 0000000000..a2e4370a70 --- /dev/null +++ b/mobilecoind-dev-faucet/src/bin/main.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +//! HTTP faucet service backed by mobilecoind + +#![deny(missing_docs)] +#![feature(proc_macro_hygiene, decl_macro)] + +use clap::Parser; +use mc_common::logger::{create_app_logger, log, o}; +use mc_mobilecoind_dev_faucet::{data_types::*, Config, State}; +use rocket::{get, post, routes, serde::json::Json}; + +/// Request payment from the faucet, and map the rust result onto json for +/// rocket appropriately +#[post("/", format = "json", data = "")] +async fn post( + state: &rocket::State, + req: Json, +) -> Json { + Json(state.handle_post(&req).await.into()) +} + +/// Request status of the faucet, and map the rust result onto json for rocket +/// apporpriately +#[get("/status")] +async fn status(state: &rocket::State) -> Json { + Json(state.handle_status().await.into()) +} + +#[rocket::main] +async fn main() -> Result<(), rocket::Error> { + mc_common::setup_panic_handler(); + let _sentry_guard = mc_common::sentry::init(); + + let config = Config::parse(); + + let (logger, _global_logger_guard) = create_app_logger(o!()); + log::info!( + logger, + "Starting mobilecoind-dev-faucet HTTP on {}:{}, connecting to {}", + config.listen_host, + config.listen_port, + config.mobilecoind_uri, + ); + + let figment = rocket::Config::figment() + .merge(("port", config.listen_port)) + .merge(("address", config.listen_host.clone())); + + let state = State::new(&config, &logger); + + let _rocket = rocket::custom(figment) + .mount("/", routes![post, status]) + .manage(state) + .launch() + .await?; + Ok(()) +} diff --git a/mobilecoind-dev-faucet/src/data_types.rs b/mobilecoind-dev-faucet/src/data_types.rs new file mode 100644 index 0000000000..62377781f1 --- /dev/null +++ b/mobilecoind-dev-faucet/src/data_types.rs @@ -0,0 +1,189 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +//! Serializeable data types that wrap the mobilecoind API. + +use mc_api::external::PublicAddress; +use mc_transaction_core::TokenId; +use mc_util_serial::JsonU64; +use serde_derive::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// A request to the faucet to fund an address +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct JsonFaucetRequest { + /// The address to fund + pub b58_address: String, + /// The token id to fund. Assumed 0 if omitted. + #[serde(default)] + pub token_id: JsonU64, +} + +/// A response describing the status of the faucet server +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct JsonFaucetStatus { + /// Whether the status request was successful + pub success: bool, + /// The error message in case of failure + #[serde(skip_serializing_if = "String::is_empty")] + pub err_str: String, + /// The b58 address of the faucet. This address can be paid to replenish the + /// faucet. + #[serde(skip_serializing_if = "String::is_empty")] + pub b58_address: String, + /// The map of token id -> payout amount for that token id. (The recipient + /// gets a little less because of fees.) + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub faucet_payout_amounts: HashMap, + /// The current balances of the faucet. + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub balances: HashMap, + /// The current depths of the queue of utxos for each token id. If these + /// queues run out then the faucet needs some more time to rebuild them. + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub queue_depths: HashMap, +} + +/// The data obtained when the faucet gets its status successfully +pub struct FaucetStatus { + /// The b58 address of the faucet + pub b58_address: String, + /// The faucet payout amounts + pub faucet_payout_amounts: HashMap, + /// The balance in each token id + pub balances: HashMap, + /// The queue depth for each token id + pub queue_depths: HashMap, +} + +impl From> for JsonFaucetStatus { + fn from(src: Result) -> Self { + match src { + Ok(FaucetStatus { + b58_address, + faucet_payout_amounts, + balances, + queue_depths, + }) => JsonFaucetStatus { + success: true, + err_str: String::default(), + b58_address, + faucet_payout_amounts: faucet_payout_amounts + .into_iter() + .map(convert_balance_pair) + .collect(), + balances: balances.into_iter().map(convert_balance_pair).collect(), + queue_depths: queue_depths.into_iter().map(convert_balance_pair).collect(), + }, + Err(err_str) => JsonFaucetStatus { + success: false, + err_str, + ..Default::default() + }, + } + } +} + +fn convert_balance_pair(pair: (TokenId, u64)) -> (JsonU64, JsonU64) { + (JsonU64(*pair.0), JsonU64(pair.1)) +} + +/// A Tx receipt that the reciepient of a payment can use (with mobilecoind) +/// to track the payment. This is returned with faucet payment responses. +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct JsonReceiverTxReceipt { + /// The recipient of the payment + pub recipient: JsonPublicAddress, + /// The hex-encoded bytes of the tx out public key + pub tx_public_key: String, + /// The hex-encoded bytes of the tx out hash + pub tx_out_hash: String, + /// The tombstone block of the submitted transaction + pub tombstone: u64, + /// The hex-encoded bytes of the confirmation number + pub confirmation_number: String, +} + +impl From<&mc_mobilecoind_api::ReceiverTxReceipt> for JsonReceiverTxReceipt { + fn from(src: &mc_mobilecoind_api::ReceiverTxReceipt) -> Self { + Self { + recipient: JsonPublicAddress::from(src.get_recipient()), + tx_public_key: hex::encode(&src.get_tx_public_key().get_data()), + tx_out_hash: hex::encode(&src.get_tx_out_hash()), + tombstone: src.get_tombstone(), + confirmation_number: hex::encode(&src.get_confirmation_number()), + } + } +} + +/// A Json encoded public address structure +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct JsonPublicAddress { + /// Hex encoded compressed ristretto bytes + pub view_public_key: String, + + /// Hex encoded compressed ristretto bytes + pub spend_public_key: String, + + /// Fog Report Server Url + pub fog_report_url: String, + + /// Hex encoded signature bytes + pub fog_authority_sig: String, + + /// String label for fog reports + pub fog_report_id: String, +} + +impl From<&PublicAddress> for JsonPublicAddress { + fn from(src: &PublicAddress) -> Self { + Self { + view_public_key: hex::encode(&src.get_view_public_key().get_data()), + spend_public_key: hex::encode(&src.get_spend_public_key().get_data()), + fog_report_url: src.get_fog_report_url().into(), + fog_report_id: src.get_fog_report_id().into(), + fog_authority_sig: hex::encode(&src.get_fog_authority_sig()), + } + } +} + +/// Related to (but not the same as) mobilecoind_api::SubmitTxResponse +/// +/// This json includes a "success" field and an "err_str" field, so it is +/// effectively like an enum over `SubmitTxResponse` and `String`. +/// +/// The `From` conversions set `success` to true or false appropriately. +/// In the success case, we only include the receiver tx receipt list, because +/// the faucet user cannot make use of the sender tx receipt. +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct JsonSubmitTxResponse { + /// Whether the payment was submitted successfully + pub success: bool, + /// An error message if the payment could not be submitted successfully + #[serde(skip_serializing_if = "String::is_empty")] + pub err_str: String, + /// A receipt for each TxOut that was sent (just one, if submitted + /// successfully) + #[serde(skip_serializing_if = "Vec::is_empty")] + pub receiver_tx_receipt_list: Vec, +} + +impl From> for JsonSubmitTxResponse { + fn from(src: Result) -> Self { + match src { + Ok(mut resp) => Self { + success: true, + err_str: String::default(), + receiver_tx_receipt_list: resp + .take_receiver_tx_receipt_list() + .iter() + .map(JsonReceiverTxReceipt::from) + .collect(), + }, + Err(err_str) => Self { + success: false, + err_str, + ..Default::default() + }, + } + } +} diff --git a/mobilecoind-dev-faucet/src/lib.rs b/mobilecoind-dev-faucet/src/lib.rs new file mode 100644 index 0000000000..932aea6d58 --- /dev/null +++ b/mobilecoind-dev-faucet/src/lib.rs @@ -0,0 +1,322 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +#![deny(missing_docs)] + +//! HTTP faucet service backed by mobilecoind + +pub mod data_types; +use data_types::*; + +mod worker; +use worker::Worker; + +use clap::Parser; +use grpcio::ChannelBuilder; +use mc_account_keys::AccountKey; +use mc_api::printable::PrintableWrapper; +use mc_common::logger::{log, Logger}; +use mc_mobilecoind_api::{mobilecoind_api_grpc::MobilecoindApiClient, MobilecoindUri}; +use mc_transaction_core::{ring_signature::KeyImage, TokenId}; +use mc_util_grpc::ConnectionUriGrpcioChannel; +use mc_util_keyfile::read_keyfile; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; + +/// Command line config, set with defaults that will work with +/// a standard mobilecoind instance +#[derive(Clone, Debug, Parser)] +#[clap( + name = "mobilecoind-dev-faucet", + about = "A stateless HTTP faucet server, backed by mobilecoind" +)] +pub struct Config { + /// Path to json-formatted key file, containing mnemonic or root entropy. + #[clap(long, env = "MC_KEYFILE")] + pub keyfile: PathBuf, + + /// The amount factor, which determines the size of the payment we make. The + /// minimum fee is multiplied by this. + #[clap(long, default_value = "20", env = "MC_AMOUNT_FACTOR")] + pub amount_factor: u64, + + /// Host to listen on. + #[clap(long, default_value = "127.0.0.1", env = "MC_LISTEN_HOST")] + pub listen_host: String, + + /// Port to start webserver on. + #[clap(long, default_value = "9090", env = "MC_LISTEN_PORT")] + pub listen_port: u16, + + /// MobileCoinD URI. + #[clap( + long, + default_value = "insecure-mobilecoind://127.0.0.1/", + env = "MC_MOBILECOIND_URI" + )] + pub mobilecoind_uri: MobilecoindUri, + + /// Target Queue Depth. When the queue for a token id is less than this in + /// depth, the worker attempts to make a split Tx to produce more TxOuts + /// for the queue. + #[clap(long, default_value = "20", env = "MC_TARGET_QUEUE_DEPTH")] + pub target_queue_depth: usize, + + /// Worker poll period in milliseconds. + #[clap(long, default_value = "100", env = "MC_WORKER_POLL_PERIOD_MS")] + pub worker_poll_period_ms: u64, +} + +/// Connection to the mobilecoind client, and other state tracked by the running +/// server (Note that this can all be recovered by restarting the server.) +/// +/// This is intended to be used with as the State of the http server in the +/// rocket framework. +pub struct State { + /// The connection to mobilecoind + pub mobilecoind_api_client: MobilecoindApiClient, + /// The account key holding our funds + pub account_key: AccountKey, + /// The bytes of our monitor id, which holds the faucet's funds + pub monitor_id: Vec, + /// The public address of the faucet, which someone can use to replenish the + /// faucet + pub monitor_b58_address: String, + /// The amounts the faucet attempts to pay for each token id + /// This is initialized to network fee * amount factor at startup + pub faucet_payout_amounts: HashMap, + /// Handle to worker thread, which pre-splits TxOut's in the background + pub worker: Worker, + /// Logger + pub logger: Logger, +} + +impl State { + /// Create a new state from config and a logger + /// This retries infinitely until it succeeds, logging errors + pub fn new(config: &Config, logger: &Logger) -> State { + // Search for keyfile and load it + let account_key = read_keyfile(config.keyfile.clone()).expect("Could not load keyfile"); + + // Set up the gRPC connection to the mobilecoind client + // Note: choice of 2 completion queues here is not very deliberate + let grpc_env = Arc::new(grpcio::EnvBuilder::new().cq_count(2).build()); + let ch = ChannelBuilder::new(grpc_env) + .max_receive_message_len(std::i32::MAX) + .max_send_message_len(std::i32::MAX) + .connect_to_uri(&config.mobilecoind_uri, logger); + + let mobilecoind_api_client = MobilecoindApiClient::new(ch); + + let (monitor_id, monitor_public_address, monitor_b58_address, minimum_fees) = loop { + match Self::try_new(&mobilecoind_api_client, &account_key) { + Ok(result) => break result, + Err(err) => log::error!(logger, "Initialization failed, will retry: {}", err), + } + std::thread::sleep(Duration::from_millis(1000)); + }; + + // The payout amount for each token id is minimum_fee * config.amount_factor + let faucet_payout_amounts: HashMap = minimum_fees + .iter() + .map(|(token_id, fee)| (*token_id, config.amount_factor * fee)) + .collect(); + + // Start background worker, which splits txouts in advance + let worker = Worker::new( + mobilecoind_api_client.clone(), + monitor_id.clone(), + monitor_public_address, + minimum_fees, + faucet_payout_amounts.clone(), + config.target_queue_depth, + Duration::from_millis(config.worker_poll_period_ms), + logger, + ); + + State { + mobilecoind_api_client, + account_key, + monitor_id, + monitor_b58_address, + faucet_payout_amounts, + worker, + logger: logger.clone(), + } + } + + // Try to issue commands to mobilecoind to set up a new faucet, returning an + // error if any of them fail + // + // Returns monitor id, monitor public address, monitor b58 address, and the + // current network minimum fees + fn try_new( + mobilecoind_api_client: &MobilecoindApiClient, + account_key: &AccountKey, + ) -> Result< + ( + Vec, + mc_api::external::PublicAddress, + String, + HashMap, + ), + String, + > { + // Create a monitor using our account key + let monitor_id = { + let mut req = mc_mobilecoind_api::AddMonitorRequest::new(); + req.set_account_key(account_key.into()); + req.set_num_subaddresses(2); + req.set_name("faucet".to_string()); + + let resp = mobilecoind_api_client + .add_monitor(&req) + .map_err(|err| format!("Failed adding a monitor: {}", err))?; + + resp.monitor_id + }; + + // Get the b58 public address for monitor + let monitor_b58_address = { + let mut req = mc_mobilecoind_api::GetPublicAddressRequest::new(); + req.set_monitor_id(monitor_id.clone()); + + let resp = mobilecoind_api_client + .get_public_address(&req) + .map_err(|err| format!("Failed getting public address: {}", err))?; + + resp.b58_code + }; + + let monitor_printable_wrapper = PrintableWrapper::b58_decode(monitor_b58_address.clone()) + .expect("Could not decode b58 address"); + assert!(monitor_printable_wrapper.has_public_address()); + let monitor_public_address = monitor_printable_wrapper.get_public_address(); + + // Get the network minimum fees and compute faucet amounts + let minimum_fees = { + let mut result = HashMap::::default(); + + let resp = mobilecoind_api_client + .get_network_status(&Default::default()) + .map_err(|err| format!("Failed getting network status: {}", err))?; + + for (k, v) in resp.get_last_block_info().minimum_fees.iter() { + result.insert(k.into(), *v); + } + + result + }; + + Ok(( + monitor_id, + monitor_public_address.clone(), + monitor_b58_address, + minimum_fees, + )) + } + + /// Handle a "post" to the faucet, which requests a payment from the faucet. + /// Returns either the mobilecoind success response or an error string. + pub async fn handle_post( + &self, + req: &JsonFaucetRequest, + ) -> Result { + let printable_wrapper = PrintableWrapper::b58_decode(req.b58_address.clone()) + .map_err(|err| format!("Could not decode b58 address: {}", err))?; + + let public_address = if printable_wrapper.has_public_address() { + printable_wrapper.get_public_address() + } else { + return Err(format!( + "b58 address '{}' is not a public address", + req.b58_address + )); + }; + + let token_id = TokenId::from(req.token_id.as_ref()); + + let utxo_record = self.worker.get_utxo(token_id)?; + log::trace!( + self.logger, + "Got a UTXO: key_image = {:?}, value = {}", + KeyImage::try_from(utxo_record.utxo.get_key_image()).unwrap(), + utxo_record.utxo.value + ); + + // Generate a Tx sending this specific TxOut, less fees + let mut req = mc_mobilecoind_api::GenerateTxFromTxOutListRequest::new(); + req.set_account_key((&self.account_key).into()); + req.set_input_list(vec![utxo_record.utxo].into()); + req.set_receiver(public_address.clone()); + req.set_token_id(*token_id); + + let resp = self + .mobilecoind_api_client + .generate_tx_from_tx_out_list_async(&req) + .map_err(|err| format!("Failed to build Tx: {}", err))? + .await + .map_err(|err| format!("Build Tx ended in error: {}", err))?; + + // Submit the tx proposal + let mut req = mc_mobilecoind_api::SubmitTxRequest::new(); + req.set_tx_proposal(resp.get_tx_proposal().clone()); + + let resp = self + .mobilecoind_api_client + .submit_tx_async(&req) + .map_err(|err| format!("Failed to submit Tx: {}", err))? + .await + .map_err(|err| format!("Submit Tx ended in error: {}", err))?; + + // Tell the worker that this utxo was submitted, so that it can track and + // recycle the utxo if this payment fails + if utxo_record.sender.send(resp.clone()).is_err() { + log::error!( + self.logger, + "Could not send SubmitTxResponse to worker thread" + ); + } + Ok(resp) + } + + /// Handle a "get status" request to the faucet. + /// Returns either the json status report or an error string. + pub async fn handle_status(&self) -> Result { + // Get up-to-date balances for all the tokens we are tracking + let mut balances: HashMap = Default::default(); + for (token_id, _) in self.faucet_payout_amounts.iter() { + let mut req = mc_mobilecoind_api::GetBalanceRequest::new(); + req.set_monitor_id(self.monitor_id.clone()); + req.set_token_id(**token_id); + + let resp = self + .mobilecoind_api_client + .get_balance_async(&req) + .map_err(|err| { + format!( + "Failed to check balance for token id '{}': {}", + token_id, err + ) + })? + .await + .map_err(|err| { + format!( + "Balance check request for token id '{}' ended in error: {}", + token_id, err + ) + })?; + balances.insert(*token_id, resp.balance); + } + + let queue_depths = self.worker.get_queue_depths(); + + Ok(FaucetStatus { + b58_address: self.monitor_b58_address.clone(), + faucet_payout_amounts: self.faucet_payout_amounts.clone(), + balances, + queue_depths: queue_depths + .into_iter() + .map(|(token_id, depth)| (token_id, depth as u64)) + .collect(), + }) + } +} diff --git a/mobilecoind-dev-faucet/src/worker.rs b/mobilecoind-dev-faucet/src/worker.rs new file mode 100644 index 0000000000..50f7ba6604 --- /dev/null +++ b/mobilecoind-dev-faucet/src/worker.rs @@ -0,0 +1,634 @@ +// Copyright (c) 2018-2022 The MobileCoin Foundation + +//! This module creates a faucet "worker". It consists of a worker thread, +//! and a handle to the worker thread, which can be used to asynchronously get +//! the output of the worker. +//! +//! The worker ensures that the faucet is ready for operation by making self- +//! payments that split TxOut's if the number of pre-split TxOut's falls below a +//! threshold. TxOut's that are ready to be used get added to a tokio queue. +//! +//! Other threads may dequeue UTXOs from such a queue and then use them. +//! The queue also carries a one-shot channel which can give the worker a +//! SubmitTxResponse, or signal that an error occurred. The worker then follows +//! up on what happens with the UTXO, and eventually requeues it if the +//! transaction doesn't resolve successfully. +//! +//! The worker uses its own thread, but uses async-friendly tokio primitives. +//! The worker does not require to be launched from the context of a tokio +//! runtime. + +use mc_common::logger::{log, o, Logger}; +use mc_mobilecoind_api::{ + external::PublicAddress, mobilecoind_api_grpc::MobilecoindApiClient, SubmitTxResponse, + TxStatus, UnspentTxOut, +}; +use mc_transaction_core::{constants::MAX_OUTPUTS, ring_signature::KeyImage, TokenId}; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::TryRecvError}, +}; + +/// A record the worker hands to faucet threads about a UTXO they can use. +/// It expects to be notified if the UTXO is successfully submitted. +/// If the one-shot sender is dropped, the worker assumes that there was an +/// error, and the faucet dropped the UTXO, so that this UTXO can be used again +/// potentially. +pub struct UtxoRecord { + pub utxo: UnspentTxOut, + pub sender: oneshot::Sender, +} + +/// A tracker the worker keeps for UTXO records it hands to faucet threads. +pub struct UtxoTracker { + pub utxo: UnspentTxOut, + receiver: oneshot::Receiver, + received: Option, +} + +impl UtxoTracker { + /// Make a new tracker and associated record for a given utxo + pub fn new(utxo: UnspentTxOut) -> (Self, UtxoRecord) { + let (sender, receiver) = oneshot::channel(); + + let record = UtxoRecord { + utxo: utxo.clone(), + sender, + }; + + let tracker = Self { + utxo, + receiver, + received: None, + }; + + (tracker, record) + } + + // If available, get either a SubmitTxResponse, or an error indicating the + // channel was closed, which means we had an error and couldn't submit the + // Tx that spent this. + pub fn poll(&mut self) -> Option> { + if let Some(resp) = self.received.as_ref() { + Some(Ok(resp.clone())) + } else { + match self.receiver.try_recv() { + Ok(resp) => { + self.received = Some(resp.clone()); + Some(Ok(resp)) + } + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Closed) => Some(Err(TryRecvError::Closed)), + } + } + } +} + +/// TokenStateReceiver holds the queue of Utxo records for a particular token +/// id, as well as other shared flags that indicate if we are out of funds etc. +pub struct TokenStateReceiver { + receiver: Mutex>, + funds_depleted_flag: Arc, + queue_depth: Arc, +} + +impl TokenStateReceiver { + /// Get a utxo from the queue, or a string explaining why we can't + pub fn get_utxo(&self) -> Result { + let mut receiver = self.receiver.lock().expect("mutex poisoned"); + loop { + match receiver.try_recv() { + Ok(utxo_record) => { + self.queue_depth.fetch_sub(1, Ordering::SeqCst); + // Check if the one-shot sender has already been closed. + // If it has, this means the worker dropped the tracker. + // This only happens if the worker decided this txo isn't unspent anymore + // when it polled mobilecoind. + // (We did not send the worker status yet.) So we should skip it and pull + // the next thing from the queue, since obviously a race has happened. + // (The worker will not spend utxos that it puts in the queue, + // but possibly another wallet with the same account key did.) + if !utxo_record.sender.is_closed() { + return Ok(utxo_record); + } + // Intentional fall-through to loop + } + Err(mpsc::error::TryRecvError::Empty) => { + return if self.funds_depleted_flag.load(Ordering::SeqCst) { + Err("faucet is depleted".to_string()) + } else { + Err("faucet is busy".to_string()) + }; + } + Err(mpsc::error::TryRecvError::Disconnected) => { + // This most likely means the worker thread has died + return Err("internal error".to_string()); + } + } + } + } + + /// Get the current depth of the queue + pub fn get_queue_depth(&self) -> usize { + self.queue_depth.load(Ordering::SeqCst) + } +} + +/// The worker is responsible for pre-splitting the faucet's balance so that it +/// can handle multiple faucet requests concurrently. +/// +/// It periodically calls `get_unspent_tx_out_list` for each token of interest. +/// If there are fewer than target_queue_depth TxOuts whose value is exactly +/// "target amount", then it attempts to make a self-payment which creates +/// MAX_OUTPUTS - 1 more pre-split TxOuts. +/// +/// To ensure concurrent faucet requests don't try to use the same unspent +/// TxOut's as eachother, the worker puts the unspent TxOut's in a queue as they +/// appear. +/// +/// Threads that have access to the worker handle can quickly try to pull an +/// unspent TxOut from the queue and send it, or find that the queue is empty +/// and give up. +pub struct Worker { + /// The reciever queues, and flags indicating if we are out of funds, for + /// each token id + receivers: HashMap, + + /// The worker thread handle + join_handle: Option>, + + /// A flag which can be used to request the worker thread to join + /// This is done by dropping the worker handle + stop_requested: Arc, +} + +impl Worker { + /// Make a new worker object given mobilecoind connection and config info, + /// and starts the worker thread. + /// + /// Arguments: + /// * client: connection to mobilecoind + /// * monitor_id: The monitor id for the account we are using + /// * public_address: The public address of our monitor id, used for + /// self-payments + /// * minimum_fees: The minimum fees for each token we are interested in + /// * target_amounts: The target value for UTXOs of each token we are + /// interested in + /// * target_queue_depth: The target depth of the queue for each token ID. + /// If a queue falls below this number the worker attempts a split Tx. + /// * worker_poll_period: A lower bound on how often the worker should poll + /// * logger + /// + /// Returns the Worker handle object, which contains the thread handle, and + /// receives the output of the worker thread. + pub fn new( + client: MobilecoindApiClient, + monitor_id: Vec, + public_address: PublicAddress, + minimum_fees: HashMap, + target_amounts: HashMap, + target_queue_depth: usize, + worker_poll_period: Duration, + logger: &Logger, + ) -> Worker { + let mut worker_token_states = Vec::::default(); + let mut receivers = HashMap::::default(); + + for (token_id, value) in target_amounts.iter() { + let minimum_fee_value = minimum_fees + .get(token_id) + .unwrap_or_else(|| panic!("Missing minimum fee for {}", token_id)); + let (state, receiver) = WorkerTokenState::new(*token_id, *minimum_fee_value, *value); + worker_token_states.push(state); + receivers.insert(*token_id, receiver); + } + + let stop_requested = Arc::new(AtomicBool::default()); + let thread_stop_requested = stop_requested.clone(); + + let logger = logger.new(o!("thread" => "worker")); + let join_handle = Some(std::thread::spawn(move || { + Self::worker_thread_entry_point( + worker_token_states, + thread_stop_requested, + client, + monitor_id, + public_address, + target_queue_depth, + worker_poll_period, + logger, + ) + })); + + Worker { + receivers, + join_handle, + stop_requested, + } + } + + /// The entrypoint for the worker thread. + /// First, wait for account to sync in mobilecoind. + /// Then enter a loop where we check stop_requested, poll each token id for + /// activity, and sleep for a bit. + fn worker_thread_entry_point( + mut worker_token_states: Vec, + stop_requested: Arc, + client: MobilecoindApiClient, + monitor_id: Vec, + public_address: PublicAddress, + target_queue_depth: usize, + worker_poll_period: Duration, + logger: Logger, + ) { + // First wait for account to sync + // Get the "initial" ledger block count + let block_count = loop { + match client.get_ledger_info(&Default::default()) { + Ok(resp) => break resp.block_count, + Err(err) => { + log::error!(logger, "Could not get ledger info: {:?}", err); + } + } + std::thread::sleep(Duration::from_millis(1000)); + }; + log::info!(logger, "Ledger is at block_count = {}", block_count); + + // Now wait for monitor state to at least pass this point + loop { + let mut req = mc_mobilecoind_api::GetMonitorStatusRequest::new(); + req.set_monitor_id(monitor_id.clone()); + match client.get_monitor_status(&req) { + Ok(resp) => { + let monitor_block_count = resp.get_status().next_block; + if monitor_block_count >= block_count { + log::info!( + logger, + "Monitor has synced to block count {}", + monitor_block_count + ); + break; + } + } + Err(err) => { + log::error!(logger, "Could not get monitor status: {:?}", err); + } + } + std::thread::sleep(Duration::from_millis(1000)); + } + + // Poll all token ids looking for activity, then sleep for a bit + loop { + if stop_requested.load(Ordering::SeqCst) { + log::info!(logger, "Worker: stop was requested"); + break; + } + for state in worker_token_states.iter_mut() { + if let Err(err_str) = state.poll( + &client, + &monitor_id, + &public_address, + target_queue_depth, + &logger, + ) { + log::error!(logger, "{}", err_str); + } + } + log::trace!(logger, "Worker sleeping"); + std::thread::sleep(worker_poll_period); + } + } + + /// Get a utxo with the target value, for a given token id. + /// This pulls a utxo from the queue, and the recipient has responsbility + /// to either successfully send the TxOut and use its oneshot::Sender to + /// report the result from consensus, or, to drop the oneshot::Sender, + /// reporting an error using the TxOut. + pub fn get_utxo(&self, token_id: TokenId) -> Result { + if let Some(receiver) = self.receivers.get(&token_id) { + receiver.get_utxo() + } else { + Err(format!("Unknown token id: {}", token_id)) + } + } + + /// Get the depths of all of the queues + pub fn get_queue_depths(&self) -> HashMap { + self.receivers + .iter() + .map(|(token_id, receiver)| (*token_id, receiver.get_queue_depth())) + .collect() + } +} + +impl Drop for Worker { + fn drop(&mut self) { + if let Some(handle) = self.join_handle.take() { + self.stop_requested.store(true, Ordering::SeqCst); + handle.join().expect("failed to join worker thread"); + } + } +} + +struct WorkerTokenState { + // The token id being tracked + token_id: TokenId, + // the minimum fee value for this token id + minimum_fee_value: u64, + // The target value of UTXOS for this token id + target_value: u64, + // The most recently known set of UTXOS for this token id + // When we get a new UTXO from mobilecoind, we track it using this cache. + // The tracker contains a one-shot channel that the other side can use to + // let us know what happens with this UTXO. + // UTXOs are added here at the same time they are queued. As long as a UTXO + // is in this cache, we won't requeue it, to avoid two threads spending + // the same UTXO concurrently. + known_utxos: HashMap, + // The queue of UTXOs with the target value + sender: UnboundedSender, + // If we submit a split transaction, the response we can use to track it + in_flight_split_tx_state: Option, + // A shared flag we use to signal if have insufficient funds for this token id + funds_depleted: Arc, + // A shared counter used to indicate roughly how many items are in the queue + queue_depth: Arc, +} + +impl WorkerTokenState { + // Create a new WorkerTokenState and matching TokenStateReceiver. + // + // Arguments: + // * token_id: The token id this state is tracking + // * minimum_fee_value: The minimum fee for this token id + // * target_value: The target value of faucet utxos for this token id + // + // Returns: + // * WorkerTokenState, which is held by the worker which calls poll periodically + // * TokenStateReceiver, which is held by the worker thread handle, which can + // get the output stream of the worker, for this token id. + fn new( + token_id: TokenId, + minimum_fee_value: u64, + target_value: u64, + ) -> (WorkerTokenState, TokenStateReceiver) { + let (sender, receiver) = mpsc::unbounded_channel::(); + + let funds_depleted_flag = Arc::new(AtomicBool::default()); + let funds_depleted = funds_depleted_flag.clone(); + + let queue_depth = Arc::new(AtomicUsize::default()); + let queue_depth_counter = queue_depth.clone(); + + ( + Self { + token_id, + minimum_fee_value, + target_value, + known_utxos: Default::default(), + sender, + in_flight_split_tx_state: None, + funds_depleted, + queue_depth, + }, + TokenStateReceiver { + receiver: Mutex::new(receiver), + funds_depleted_flag, + queue_depth: queue_depth_counter, + }, + ) + } + + // Poll a given token for activity. + // + // (1) Check up on old utxos, checking if they were eventually submitted + // or not, and if those submissions were successful. If their submissions + // resolve, it purges them from its cache so that they can be found again + // and resubmitted if necessary. + // (2) Get the UTXO list for this token, checks it for new UTXOs, and + // sends things to the channel if we do find new things. + // (3) Check if we have enough pre-split Txos, and if we don't, check + // if we already have an in-flight Tx to try to fix this. If not then it builds + // and submits a new splitting Tx. + // + // Returns a string which should be logged if e.g. we encounter an RPC error + fn poll( + &mut self, + client: &MobilecoindApiClient, + monitor_id: &[u8], + public_address: &PublicAddress, + target_queue_depth: usize, + logger: &Logger, + ) -> Result<(), String> { + // First, for each known utxo already queued, check if it was sent in a + // transaction and if so what the status is + self.known_utxos.retain(|_key_image, tracker| { + if let Some(status) = tracker.poll() { + // If poll returned Some, then we either got a SubmitTxResponse or an error + if let Ok(resp) = status { + // It was successfully submitted to the network, let's ask mobilecoind what's + // happened. If it's still in-flight we should retain it, if it has resolved, + // we should drop it from our records. + is_tx_still_in_flight(client, &resp, "Faucet", logger) + } else { + // The oneshot receiver resolved in an error, this means, the other side dropped + // this channel, without reporting a SubmitTxResponse. This + // means there was an error building or submitting the Tx, + // and the other side has now dropped this UnspentTxOut. We should requeue it + // so that it can be eventually be spent, and for now we should just purge it. + false + } + } else { + // Still in the queue as far as we know + true + } + }); + + // Now, get a fresh unspent tx out list associated to this token + let mut resp = { + let mut req = mc_mobilecoind_api::GetUnspentTxOutListRequest::new(); + req.token_id = *self.token_id; + req.monitor_id = monitor_id.to_vec(); + + client.get_unspent_tx_out_list(&req).map_err(|err| { + format!( + "Could not get unspent txout list for token id = {}: {}", + self.token_id, err + ) + })? + }; + + // Now, check all the reported utxos. + // If it is new and has the target value, then queue it + let mut output_list_key_images = HashSet::::default(); + + for utxo in resp.output_list.iter() { + // Sanity check the token id + if utxo.token_id != self.token_id { + continue; + } + + // Only utxos of exactly the target value are elligible to go in the queue. + // The others are "non-target-value utxos" which are candidates to be used + // (later) in split transactions that produce more target-value utxos, + // if the queue is getting empty. + if utxo.value != self.target_value { + continue; + } + + let key_image: KeyImage = utxo + .get_key_image() + .try_into() + .map_err(|err| format!("invalid key image: {}", err))?; + if let Entry::Vacant(e) = self.known_utxos.entry(key_image) { + // We found a utxo not in the cache, let's queue and add to cache + log::trace!( + logger, + "Queueing a utxo: key_image = {:?}, value = {}", + key_image, + utxo.value + ); + let (tracker, record) = UtxoTracker::new(utxo.clone()); + // Add to queue depth before push, because we subtract after pop + self.queue_depth.fetch_add(1, Ordering::SeqCst); + if self.sender.send(record).is_err() { + panic!("Queue was closed before worker thread was joined, this is an unexpected program state"); + } + e.insert(tracker); + } + + // Add the key image of this utxo to a set, this helps us purge the cache + output_list_key_images.insert(key_image); + } + + // Remove any known utxos that no longer exist in the response list + // That is, remove any utxo whose key image wasn't added to + // output_list_key_images before. (This also drops the one-shot receiver, + // and so can tell the other side not to bother sending this utxo if they get + // it from the queue.) + self.known_utxos + .retain(|key_image, _tracker| output_list_key_images.contains(key_image)); + + // Check the queue depth, and decide if we should make a split tx + if self.queue_depth.load(Ordering::SeqCst) < target_queue_depth { + // Check if we already tried to fix this in the last iteration + if let Some(prev_tx) = self.in_flight_split_tx_state.as_ref() { + if is_tx_still_in_flight(client, prev_tx, "Split", logger) { + // There is already a fix in-flight, let's do nothing until it lands. + return Ok(()); + } + } + log::trace!(logger, "Attempting to split on token id {}", self.token_id); + // At this point, the previous in-flight tx resolved somehow and if it was an + // error we logged it + self.in_flight_split_tx_state = None; + + // We will now attempt to build and submit a split Tx that prooduces TxOuts of + // target value from those that aren't + let non_target_value_utxos: Vec<_> = resp + .take_output_list() + .into_iter() + .filter(|utxo| utxo.token_id == self.token_id && utxo.value != self.target_value) + .collect(); + + // First make sure we have enough funds for what we want to do, so we don't spam + // errors when we are depleted, and so that faucet users can know + // that retries won't help. + if non_target_value_utxos + .iter() + .map(|utxo| utxo.value) + .sum::() + < self.target_value * (MAX_OUTPUTS - 1) + self.minimum_fee_value + { + self.funds_depleted.store(true, Ordering::SeqCst); + log::trace!(logger, "Funds depleted on {}", self.token_id); + return Ok(()); + } else { + let prev_value = self.funds_depleted.swap(false, Ordering::SeqCst); + if prev_value { + log::info!(logger, "Funds no longer depleted on {}", self.token_id); + } + } + + // Generate an outlay + // We will repeat this outlay MAX_OUTPUTS - 1 times + // (-1 is for a change output) + let mut outlay = mc_mobilecoind_api::Outlay::new(); + outlay.set_receiver(public_address.clone()); + outlay.set_value(self.target_value); + + // Generate a Tx + let mut req = mc_mobilecoind_api::GenerateTxRequest::new(); + req.set_sender_monitor_id(monitor_id.to_vec()); + req.set_token_id(*self.token_id); + req.set_input_list(non_target_value_utxos.into()); + req.set_outlay_list(vec![outlay; MAX_OUTPUTS as usize - 1].into()); + + let mut resp = client + .generate_tx(&req) + .map_err(|err| format!("Failed to generate split tx: {}", err))?; + + // Submit the Tx + let mut req = mc_mobilecoind_api::SubmitTxRequest::new(); + req.set_tx_proposal(resp.take_tx_proposal()); + let submit_tx_response = client + .submit_tx(&req) + .map_err(|err| format!("Failed to submit split tx: {}", err))?; + + // This lets us keep tabs on when this split payment has resolved, so that we + // can avoid sending another payment until it does + self.in_flight_split_tx_state = Some(submit_tx_response); + } + + Ok(()) + } +} + +/// Check if a given tx is still in-flight. +/// Logs an error if something strange happened +/// +/// Arguments: +/// * client: connection to mobilecoind +/// * tx: the submit tx response +/// * context: The context of this tx, used for logging +/// * logger +/// +/// Returns true if the tx is still (potentially) in-flight, false if it has +/// resolved now (either successfully or in an error) +fn is_tx_still_in_flight( + client: &MobilecoindApiClient, + tx: &SubmitTxResponse, + context: &str, + logger: &Logger, +) -> bool { + match client.get_tx_status_as_sender(tx) { + Ok(resp) => { + if resp.status == TxStatus::Unknown { + return true; + } + if resp.status != TxStatus::Verified { + log::error!( + logger, + "{} Tx ended with status: {:?}", + context, + resp.status + ); + } + // Whether successful or an error, the Tx has resolved now + false + } + Err(err) => { + log::error!(logger, "Failed getting {} Tx status: {}", context, err); + // We still don't know the status, so it may still be in-flight + true + } + } +} diff --git a/mobilecoind-json/Cargo.toml b/mobilecoind-json/Cargo.toml index 6d75fd1ea1..e28505573d 100644 --- a/mobilecoind-json/Cargo.toml +++ b/mobilecoind-json/Cargo.toml @@ -13,6 +13,7 @@ mc-api = { path = "../api" } mc-common = { path = "../common", features = ["loggers"] } mc-mobilecoind-api = { path = "../mobilecoind/api" } mc-util-grpc = { path = "../util/grpc" } +mc-util-serial = { path = "../util/serial", features = ["std"] } clap = { version = "3.1", features = ["derive", "env"] } grpcio = "0.10.2" @@ -21,7 +22,6 @@ protobuf = "2.27.1" rocket = { version = "0.5.0-rc.2", features = ["json"] } serde = "1.0" serde_derive = "1.0" -serde_with = "1.13" [dev-dependencies] mc-crypto-keys = { path = "../crypto/keys" } diff --git a/mobilecoind-json/src/data_types.rs b/mobilecoind-json/src/data_types.rs index 1c0422c81f..62cc09ab2c 100644 --- a/mobilecoind-json/src/data_types.rs +++ b/mobilecoind-json/src/data_types.rs @@ -7,39 +7,11 @@ use mc_api::external::{ PublicAddress, RingMLSAG, SignatureRctBulletproofs, Tx, TxIn, TxOutMembershipElement, TxOutMembershipHash, TxOutMembershipProof, TxPrefix, }; +use mc_util_serial::JsonU64; use protobuf::RepeatedField; use serde_derive::{Deserialize, Serialize}; use std::convert::TryFrom; -// Represents u64 using string, when serializing to Json -// Javascript integers are not 64 bit, and so it is not really proper json. -// Using string avoids issues with some json parsers not handling large numbers -// well. -// -// This does not rely on the serde-json arbitrary precision feature, which -// (we fear) might break other things (e.g. https://github.com/serde-rs/json/issues/505) -#[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)] -#[serde(transparent)] -pub struct JsonU64(#[serde(with = "serde_with::rust::display_fromstr")] pub u64); - -impl From<&u64> for JsonU64 { - fn from(src: &u64) -> Self { - Self(*src) - } -} - -impl From<&JsonU64> for u64 { - fn from(src: &JsonU64) -> u64 { - src.0 - } -} - -impl From for u64 { - fn from(src: JsonU64) -> u64 { - src.0 - } -} - #[derive(Deserialize, Default, Debug)] pub struct JsonPasswordRequest { pub password: String, diff --git a/util/serial/Cargo.toml b/util/serial/Cargo.toml index 1a522ecd54..8b39c828c2 100644 --- a/util/serial/Cargo.toml +++ b/util/serial/Cargo.toml @@ -9,9 +9,10 @@ edition = "2021" # because rmp-ser../../alloc doesn't build against ser../../std, # so if anything else in your build plan will activate ser../../std, then mcseri../../std is # required. -std = ["serde/std", "serde_cbor/std"] +std = ["serde/std", "serde_cbor/std", "serde_with"] [dependencies] prost = { version = "0.10", default-features = false, features = ["prost-derive"] } serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] } serde_cbor = { version = "0.11.1", default-features = false, features = ["alloc"] } +serde_with = { version = "1.14", default-features = false, optional = true } diff --git a/util/serial/src/lib.rs b/util/serial/src/lib.rs index 7aed75d410..82e1fc5987 100644 --- a/util/serial/src/lib.rs +++ b/util/serial/src/lib.rs @@ -8,6 +8,7 @@ use alloc::vec::Vec; pub extern crate prost; pub use prost::{DecodeError, EncodeError, Message}; +use serde::{Deserialize, Serialize}; // We put a new-type around serde_cbor::Error in `mod decode` and `mod encode`, // because this keeps us compatible with how rmp-serde was exporting its errors, @@ -53,15 +54,17 @@ pub mod encode { /// fail. pub fn serialize(value: &T) -> Result, encode::Error> where - T: serde::ser::Serialize + Sized, + T: Serialize + Sized, { Ok(serde_cbor::to_vec(value)?) } -// Forward mc_util_serial::deserialize to bincode::deserialize +/// Deserialize the given bytes to a data structure. +/// +/// Forward mc_util_serial::deserialize to serde_cbor::from_slice pub fn deserialize<'a, T>(bytes: &'a [u8]) -> Result where - T: serde::de::Deserialize<'a>, + T: Deserialize<'a>, { Ok(serde_cbor::from_slice(bytes)?) } @@ -82,6 +85,52 @@ where Ok(value) } +#[cfg(feature = "serde_with")] +mod json_u64 { + + use super::*; + + /// Represents u64 using string, when serializing to Json + /// Javascript integers are not 64 bit, and so it is not really proper json. + /// Using string avoids issues with some json parsers not handling large + /// numbers well. + /// + /// This does not rely on the serde-json arbitrary precision feature, which + /// (we fear) might break other things (e.g. https://github.com/serde-rs/json/issues/505) + #[derive(Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq, Hash, Serialize)] + #[serde(transparent)] + pub struct JsonU64(#[serde(with = "serde_with::rust::display_fromstr")] pub u64); + + impl From<&u64> for JsonU64 { + fn from(src: &u64) -> Self { + Self(*src) + } + } + + impl From<&JsonU64> for u64 { + fn from(src: &JsonU64) -> u64 { + src.0 + } + } + + impl From for u64 { + fn from(src: JsonU64) -> u64 { + src.0 + } + } + + impl AsRef for JsonU64 { + fn as_ref(&self) -> &u64 { + &self.0 + } + } +} + +/// JsonU64 is exported if it is available -- the serde_with crate which it +/// depends on relies on std, so it must be optional. +#[cfg(feature = "serde_with")] +pub use json_u64::JsonU64; + #[cfg(test)] mod test { use super::*; @@ -115,3 +164,26 @@ mod test { assert_eq!(deserialized, the_struct); } } + +#[cfg(all(test, feature = "serde_with"))] +mod json_u64_tests { + use super::*; + use serde::{Deserialize, Serialize}; + + #[derive(PartialEq, Serialize, Deserialize, Debug)] + struct TestStruct { + nums: Vec, + block: JsonU64, + } + + #[test] + fn test_serialize_jsonu64_struct() { + let the_struct = TestStruct { + nums: (&[0, 1, 2, u64::MAX]).iter().map(Into::into).collect(), + block: JsonU64(u64::MAX - 1), + }; + let serialized = serialize(&the_struct).unwrap(); + let deserialized: TestStruct = deserialize(&serialized).unwrap(); + assert_eq!(deserialized, the_struct); + } +}