Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure limit on tx proposals #3296

Open
wants to merge 50 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
1e52e1b
Track failed tx proposals in consensus
NotGyro Mar 8, 2023
9e7465b
Push an Instant::now() for every time validation fails on a tx proposal
NotGyro Mar 9, 2023
94f99d9
Fix unnecessary .clone()
NotGyro Mar 14, 2023
8b2b437
Cargo fmt
NotGyro Mar 14, 2023
d818079
Return to tracking clients as soon as they submit a proposeTX, per Ja…
NotGyro Mar 27, 2023
cf0b095
Cargo fmt
NotGyro Mar 27, 2023
9016e8e
Failure limit on tx proposals
NotGyro Mar 28, 2023
c4c9fb2
Cargo fmt.
NotGyro Mar 30, 2023
91bf8cd
Actually close sessions with problem clients
NotGyro Mar 30, 2023
ed3ecfe
Cargo fmt
NotGyro Mar 30, 2023
df61256
Adding a comment
NotGyro Mar 30, 2023
ee2c475
Cargo fmt
NotGyro Mar 30, 2023
72cd0d5
Test ensuring client_close() is called when limit is hit
NotGyro Apr 1, 2023
232f149
Cargo fmt
NotGyro Apr 1, 2023
a32f637
Fixes and improvements to tx proposal ratelimiting system.
NotGyro Apr 4, 2023
3ee8518
cargo clippy --fix
NotGyro Apr 4, 2023
8dcdf8d
Track failed tx proposals in consensus
NotGyro Mar 8, 2023
f4e80eb
Push an Instant::now() for every time validation fails on a tx proposal
NotGyro Mar 9, 2023
0d2ab75
Fix unnecessary .clone()
NotGyro Mar 14, 2023
9e557dd
Cargo fmt
NotGyro Mar 14, 2023
c342f6c
Return to tracking clients as soon as they submit a proposeTX, per Ja…
NotGyro Mar 27, 2023
ecd5836
Cargo fmt
NotGyro Mar 27, 2023
93d6030
Apply suggestions from code review
NotGyro Apr 7, 2023
6b3861f
Respond to suggestions on the pull request, no longer using reference…
NotGyro Apr 7, 2023
ee9ec68
Merge in suggested changes
NotGyro Apr 7, 2023
bf6b2d3
chore(deps): bump syn from 1.0.109 to 2.0.11 (#3295)
dependabot[bot] Mar 30, 2023
429d3ea
Update log messages (#3301)
varsha888 Apr 3, 2023
b130c4a
move transaction summary computation to separate crate (#3132)
ryankurte Apr 4, 2023
31455af
include necessary optional dep when the serde feature is enabled (#3303)
eranrund Apr 5, 2023
3492fff
Merge in suggested changes
NotGyro Apr 7, 2023
be4fd13
cargo fmt
NotGyro Apr 7, 2023
c9ecddf
Merge branch 'master' into milliec/03-07-Track_failed_tx_proposals_in…
NotGyro Apr 7, 2023
7651276
Refactor out a shared compute authenticated sender and computer desti…
wjuan-mob Apr 6, 2023
c1014d0
chore(deps): bump serde from 1.0.154 to 1.0.159 (#3294)
dependabot[bot] Apr 6, 2023
f618719
Failure limit on tx proposals
NotGyro Mar 28, 2023
9749e59
Actually close sessions with problem clients
NotGyro Mar 30, 2023
c1c00c6
Fixes within the process of rebasing
NotGyro Apr 7, 2023
2ed4c51
Merge branch 'milliec/03-07-Track_failed_tx_proposals_in_consensus' i…
NotGyro Apr 7, 2023
6c5f429
Merge branch 'master' into milliec/03-28-Failure_limit_on_tx_proposals
NotGyro Apr 8, 2023
81c621d
cargo fmt
NotGyro Apr 10, 2023
cf078af
chore(deps): bump bitflags from 1.3.2 to 2.0.1 (#3250)
dependabot[bot] Apr 10, 2023
766fa33
refactor mobilecoind api to not depend on consensus-api (#3307)
cbeck88 Apr 10, 2023
94a7b15
unforked diesel and updated necessary code (#3304)
briancorbin Apr 10, 2023
9562735
Failure limit on tx proposals
NotGyro Mar 28, 2023
5bd1b98
Cargo fmt.
NotGyro Mar 30, 2023
2b0c4ef
Actually close sessions with problem clients
NotGyro Mar 30, 2023
67c2284
Push an Instant::now() for every time validation fails on a tx proposal
NotGyro Mar 9, 2023
65bd6ad
Cargo fmt
NotGyro Mar 14, 2023
7be7728
Respond to suggestions on the pull request, no longer using reference…
NotGyro Apr 7, 2023
23a168e
Merge branch 'master' into milliec/03-28-Failure_limit_on_tx_proposals
NotGyro Apr 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions consensus/api/proto/consensus_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,24 @@ message ConsensusNodeConfig {

// SCP message signing key.
external.Ed25519Public scp_message_signing_key = 8;

// Maximum number of client session tracking structures to retain in
// a least-recently-used cache.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃 Don't think "least recently used" is a hyphenated word

Suggest looking at my comment on "denial-of-service" before deciding

Suggested change
// a least-recently-used cache.
// a least recently used cache.

//
// This corresponds to Config::client_tracking_capacity
uint64 client_tracking_capacity = 9;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ I don't think it matters as they're all dynamically sized, but why uint64 here? Considering tx_failure_limit is uint32


// How many seconds to retain instances of proposed-transaction
// failures, per-client. This is used to implement DOS-protection,
// protecting against clients who make too many failed
// transaction proposals within this span.
uint64 tx_failure_window_seconds = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Like the call out of the units in the name since there is no type safety.


// How many tx proposal failures within the rolling window are required
// before it's treated as concerning, thereby tripping denial-of-service
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 interestingly "denial-of-service" is often hyphenated on the interwebs. It looks like the rules are pretty loose when to hyphenate, some say since "denial-of-service" is a compound adjective it can be hyphenated.

// protection?
// In other words, how many failed transaction proposals are permitted
// within the last tx_failure_window_seconds before a user is
// disconnected or temporarily blocked?
uint32 tx_failure_limit = 11;
}
22 changes: 22 additions & 0 deletions consensus/service/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ pub struct Config {
/// config setting to match.
#[clap(long, default_value = "10000", env = "MC_CLIENT_TRACKING_CAPACITY")]
pub client_tracking_capacity: usize,

/// How many seconds to retain instances of proposed-transaction
/// failures, per-client. This is used to implement DOS-protection,
/// along the lines of kicking clients who make too many failed transaction
/// proposals within the span of tx_failure_window
// TODO: slam-testing to derive reasonable default
#[clap(long, default_value = "30", value_parser = parse_duration_in_seconds, env = "MC_CLIENT_TX_FAILURE_WINDOW")]
pub tx_failure_window: Duration,

/// How many tx proposal failures within the rolling window are required
/// before it's treated as concerning, thereby tripping denial-of-service
/// protection?
/// In other words, how many failed transaction proposals are permitted
/// within the last tx_failure_window seconds before a user is
/// disconnected or temporarily blocked?
// TODO: slam-testing to derive reasonable default
#[clap(long, default_value = "16384", env = "MC_CLIENT_TX_FAILURE_LIMIT")]
pub tx_failure_limit: u32,
}

impl Config {
Expand Down Expand Up @@ -224,6 +242,8 @@ mod tests {
tokens_path: None,
block_version: BlockVersion::ZERO,
client_tracking_capacity: 4096,
tx_failure_window: Duration::from_secs(30),
tx_failure_limit: 16384,
};

assert_eq!(
Expand Down Expand Up @@ -293,6 +313,8 @@ mod tests {
tokens_path: None,
block_version: BlockVersion::ZERO,
client_tracking_capacity: 4096,
tx_failure_window: Duration::from_secs(30),
tx_failure_limit: 16384,
};

assert_eq!(
Expand Down
210 changes: 195 additions & 15 deletions consensus/service/src/api/client_api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use crate::{
tx_manager::{TxManager, TxManagerError},
SVC_COUNTERS,
};
use grpcio::{RpcContext, RpcStatus, UnarySink};
use grpcio::{RpcContext, RpcStatus, RpcStatusCode, UnarySink};
use mc_attest_api::attest::Message;
use mc_attest_enclave_api::ClientSession;
use mc_common::{logger::Logger, LruCache};
use mc_common::{
logger::{log, Logger},
LruCache,
};
use mc_consensus_api::{
consensus_client::{ProposeMintConfigTxResponse, ProposeMintTxResponse},
consensus_client_grpc::ConsensusClientApi,
Expand Down Expand Up @@ -55,6 +58,17 @@ impl ClientSessionTracking {
}
}

pub fn get_proposetx_failures(&self) -> usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Is this name correct? Also getter setter methods should avoid get_ prefix

Suggested change
pub fn get_proposetx_failures(&self) -> usize {
pub fn num_tx_proposal_failures(&self) -> usize {

While the usize uses communicates the type, I used "num" to indicate it wasn't the actual field itself.

self.tx_proposal_failures.len()
}

/// Remove any transaction proposal failure record that is older than our
/// tracking window.
fn clear_stale_records(&mut self, now: Instant, tracking_window: Duration) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like passing the current instance in, makes it easier to test

self.tx_proposal_failures
.retain(|past_failure| now.saturating_duration_since(*past_failure) <= tracking_window);
}

/// Push a new failed tx proposal record, clear out samples older than
/// our tracking window, and return the number of tx failures remaining
/// on the list - as-in, tells you "there have been x number of failures
Expand All @@ -69,8 +83,7 @@ impl ClientSessionTracking {
/// have existed for longer than this value will be dropped when this
/// method is called.
pub fn fail_tx_proposal(&mut self, now: Instant, tracking_window: Duration) -> usize {
self.tx_proposal_failures
.retain(|past_failure| now.saturating_duration_since(*past_failure) <= tracking_window);
self.clear_stale_records(now, tracking_window);
self.tx_proposal_failures.push_back(now);
self.tx_proposal_failures.len()
}
Expand Down Expand Up @@ -143,13 +156,14 @@ impl ClientApiService {
// in pull request #3296 "Failure limit on tx proposals"
let tracking_window = Duration::from_secs(60);
let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned");
if !tracker.contains(&session_id) {
let record = if let Some(record) = tracker.get_mut(&session_id) {
record
} else {
tracker.put(session_id.clone(), ClientSessionTracking::new());
}
let record = tracker
.get_mut(&session_id)
.expect("Session id {session_id} should be tracked.");

tracker
.get_mut(&session_id)
.expect("Adding session-tracking record should be atomic.")
};
let _recent_failure_count =
record.fail_tx_proposal(Instant::now(), tracking_window);
// Dropping the client after a limit has been reached will be
Expand Down Expand Up @@ -261,6 +275,10 @@ impl ClientApiService {
response.set_block_version(*self.config.block_version);
response.set_scp_message_signing_key((&self.config.msg_signer_key.public_key()).into());

response.set_client_tracking_capacity(self.config.client_tracking_capacity as u64);
response.set_tx_failure_window_seconds(self.config.tx_failure_window.as_secs());
response.set_tx_failure_limit(self.config.tx_failure_limit);

Ok(response)
}
}
Expand All @@ -274,13 +292,54 @@ impl ConsensusClientApi for ClientApiService {
) {
let _timer = SVC_COUNTERS.req(&ctx);

let session_id = ClientSession::from(msg.channel_id.clone());

{
let session = ClientSession::from(msg.channel_id.clone());
let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned");
// Calling get() on the LRU bumps the entry to show up as more
// recently-used.
if tracker.get(&session).is_none() {
tracker.put(session, ClientSessionTracking::new());
if tracker.get(&session_id).is_none() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 This feels like this block could be its own method similar to the check_request_chain_id() guard clause below

tracker.put(session_id.clone(), ClientSessionTracking::new());
}

let session_info = tracker
.get(&session_id)
.expect("Session should be present after insert");
let recent_failures = session_info.get_proposetx_failures() as u32;
if recent_failures >= self.config.tx_failure_limit {
log::debug!(
self.logger,
"Client has {} recent failed tx proposals within the \
last {} seconds - dropping connection.",
recent_failures,
self.config.tx_failure_window.as_secs_f32()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Not sure "f32" gets us much here since these come in as whole seconds

Suggested change
self.config.tx_failure_window.as_secs_f32()
self.config.tx_failure_window.as_secs()

);
// Rate-limiting is performed at the auth endpoint, so
// merely dropping the connection will be enough.
let close_result = self.enclave.client_close(session_id.clone());
// At the time of writing (30th March, 2023), it should
// only be possible for client_close() to error if a
// mutex is poisoned. However, because the
// implementation of this method might change, it
// seems wise to handle any error this might throw.
if let Err(e) = close_result {
log::error!(
self.logger,
"Failed to drop session {:?} due to: {:?}",
&session_id,
e
);
} else {
let _ = tracker.pop(&session_id);
}

// Send an error indicating the rate-limiting.
let rpc_code = RpcStatusCode::RESOURCE_EXHAUSTED;
let rpc_error = ConsensusGrpcError::RpcStatus(RpcStatus::new(rpc_code));
let result: Result<_, RpcStatus> = rpc_error.into();

// Send the error and return early.
return send_result(ctx, sink, result, &self.logger);
}
}

Expand Down Expand Up @@ -308,8 +367,21 @@ impl ConsensusClientApi for ClientApiService {
ConsensusGrpcError::NotServing.into()
}
} else {
self.handle_proposed_tx(msg)
.or_else(ConsensusGrpcError::into)
let result = self.handle_proposed_tx(msg);
// The block present below rate-limits suspicious behavior.
if let Err(_err) = &result {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Thinking is_err() would work here since the contents don't appear to be looked at

Suggested change
if let Err(_err) = &result {
if result.is_err() {

let mut tracker = self.tracked_sessions.lock().expect("Mutex poisoned");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 I usually lean on the 3 strike rule:

  1. you're writing code
  2. It might be coincidence that the logic is duplicate
  3. You're out, refactor.

This is the 3rd time this patter of if get() else put() && get() has been present. This should have method which does this for you.
Unfortunately due to the mutex this will be a little bit more work as it will need to return a session guard that implements drop, to free up the mutex.

let record = if let Some(record) = tracker.get_mut(&session_id) {
record
} else {
tracker.put(session_id.clone(), ClientSessionTracking::new());
tracker
.get_mut(&session_id)
.expect("Adding session-tracking record should be atomic.")
};
record.fail_tx_proposal(Instant::now(), self.config.tx_failure_window);
}
result.or_else(ConsensusGrpcError::into)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Thoughts on using a match statement here?
Something about seeing is_err logic and then in a separate block modifying that error.

};

result = result.and_then(|mut response| {
Expand Down Expand Up @@ -1825,4 +1897,112 @@ mod client_api_tests {
.expect("Attempt to lock session-tracking mutex failed.");
assert_eq!(tracker.len(), 1);
}

#[test_with_logger]
#[serial(counters)]
fn test_get_kicked_failure_limit(logger: Logger) {
let limit = 3;

let mut consensus_enclave = MockConsensusEnclave::new();

let scp_client_value_sender = Arc::new(
|_value: ConsensusValue,
_node_id: Option<&NodeID>,
_responder_id: Option<&ResponderId>| {
// TODO: store inputs for inspection.
},
);

const NUM_BLOCKS: u64 = 5;
let mut ledger = MockLedger::new();
ledger
.expect_num_blocks()
.times(limit as usize)
.return_const(Ok(NUM_BLOCKS));

let tx_manager = MockTxManager::new();
let is_serving_fn = Arc::new(|| -> bool { true });
let authenticator = AnonymousAuthenticator::default();

const LRU_CAPACITY: usize = 4096;
let tracked_sessions = Arc::new(Mutex::new(LruCache::new(LRU_CAPACITY)));

let mut config = get_config();
// Permit only 3 failed transactions
config.tx_failure_limit = limit;

// Cause the mock enclave to consistently fail each request.
consensus_enclave
.expect_client_tx_propose()
.times(limit as usize)
.return_const(Err(EnclaveError::MalformedTx(
TransactionValidationError::ContainsSpentKeyImage,
)));
// Expect a close, since this will be exceeding our limit
consensus_enclave
.expect_client_close()
.times(1)
.return_const(Ok(()));

let instance = ClientApiService::new(
config,
Arc::new(consensus_enclave),
scp_client_value_sender,
Arc::new(ledger),
Arc::new(tx_manager),
Arc::new(MockMintTxManager::new()),
is_serving_fn,
Arc::new(authenticator),
logger,
// Clone this, maintaining our own Arc reference into the tracked
// sessions structure so that we can inspect it later.
tracked_sessions.clone(),
);

// gRPC client and server.
let (client, _server) = get_client_server(instance);
let message = Message::default();

for _ in 0..limit {
let propose_tx_response = client
.client_tx_propose(&message)
.expect("Client tx propose error");
assert_eq!(
propose_tx_response.get_result(),
ProposeTxResult::ContainsSpentKeyImage
);
}
// No failed transaction proposals over the limit yet, so the session
// shouldn't have been dropped
{
let tracker = tracked_sessions
.lock()
.expect("Attempt to lock session-tracking mutex failed.");
assert_eq!(tracker.len(), 1);
let (_session_id, tracking_data) = tracker.iter().next().unwrap();
assert_eq!(tracking_data.tx_proposal_failures.len(), limit as usize);
}

let propose_tx_response = client.client_tx_propose(&message);
assert!(propose_tx_response.is_err());

match propose_tx_response {
Err(grpcio::Error::RpcFailure(rpc_status)) => {
assert_eq!(rpc_status.code(), RpcStatusCode::RESOURCE_EXHAUSTED);
}
_ => panic!(
"Unexpected response upon continuing to use\
a rate-limited session: {propose_tx_response:?}"
),
}
Comment on lines +1987 to +1997
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 it might lose a little bit of info on failures but perhaps

Suggested change
assert!(propose_tx_response.is_err());
match propose_tx_response {
Err(grpcio::Error::RpcFailure(rpc_status)) => {
assert_eq!(rpc_status.code(), RpcStatusCode::RESOURCE_EXHAUSTED);
}
_ => panic!(
"Unexpected response upon continuing to use\
a rate-limited session: {propose_tx_response:?}"
),
}
assert!(
matches!(propose_tx_response, Err(grpcio::Error::RpcFailure(failure)) if failure.code() == RpcStatusCode::RESOURCE_EXHAUSTED)
);


let tracker = tracked_sessions
.lock()
.expect("Attempt to lock session-tracking mutex failed.");
// This session should have been dropped at this point.
assert_eq!(tracker.len(), 0);

// Because of the behavior of Mockall, if this returns without calling
// client_close() exactly once, it will panic and the test will fail.
Comment on lines +2004 to +2006
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 it doesn't seem like this comment is needed since the tracker will close it.

Suggested change
// Because of the behavior of Mockall, if this returns without calling
// client_close() exactly once, it will panic and the test will fail.

}
}