Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle failed changes properly #269

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 24 additions & 47 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//!
//! This module is _big_ and maybe should be split up further.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::RangeInclusive;

Expand All @@ -13,9 +12,12 @@ use std::{
time::{Duration, Instant},
};

use crate::agent::util::log_at_pow_10;
use crate::{
agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL},
agent::{
bi, bootstrap, uni,
util::{log_at_pow_10, process_multiple_changes},
SyncClientError, ANNOUNCE_INTERVAL,
},
api::peer::parallel_sync,
transport::Transport,
};
Expand All @@ -36,6 +38,7 @@ use corro_types::base::Version;
use corro_types::broadcast::Timestamp;
use corro_types::change::store_empty_changeset;
use foca::Notification;
use indexmap::map::Entry;
use indexmap::IndexMap;
use metrics::{counter, gauge, histogram};
use rand::{prelude::IteratorRandom, rngs::StdRng, SeedableRng};
Expand Down Expand Up @@ -747,8 +750,8 @@ pub async fn handle_changes(
agent.config().perf.apply_queue_timeout as u64,
));

const MAX_SEEN_CACHE_LEN: usize = 10000;
const KEEP_SEEN_CACHE_SIZE: usize = 1000;
let max_seen_cache_len: usize = max_queue_len;
let keep_seen_cache_size: usize = cmp::max(10, max_seen_cache_len / 10);
let mut seen: IndexMap<_, RangeInclusiveSet<CrsqlSeq>> = IndexMap::new();

let mut drop_log_count: u64 = 0;
Expand Down Expand Up @@ -776,23 +779,7 @@ pub async fn handle_changes(
let changes = std::mem::take(&mut buf);
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(
BTreeMap::new(),
|mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>>, change| {
acc.entry(change.0.actor_id)
.or_default()
.insert(change.0.versions());
acc
},
)
} else {
BTreeMap::new()
}
});
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));

buf_cost -= tmp_cost;
}
Expand All @@ -804,13 +791,8 @@ pub async fn handle_changes(
// but we need to drain it to free up concurrency
res = join_set.join_next(), if !join_set.is_empty() => {
debug!("processed multiple changes concurrently");
if let Some(Ok(res)) = res {
for (actor_id, versions) in res {
let versions: Vec<_> = versions.into_iter().flatten().collect();
for version in versions {
seen.remove(&(actor_id, version));
}
}
if let Some(Ok(Err(e))) = res {
error!("could not process multiple changes: {e}");
}
continue;
},
Expand All @@ -833,24 +815,13 @@ pub async fn handle_changes(
let changes: Vec<_> = queue.drain(..).collect();
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(async move {
if let Err(e) = util::process_multiple_changes(agent, bookie, changes.clone()).await
{
error!("could not process multiple changes: {e}");
changes.iter().fold(BTreeMap::new(), |mut acc: BTreeMap<ActorId, RangeInclusiveSet<Version>> , change| {
acc.entry(change.0.actor_id).or_default().insert(change.0.versions());
acc
})
} else {
BTreeMap::new()
}
});
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));
buf_cost = 0;
}

if seen.len() > MAX_SEEN_CACHE_LEN {
if seen.len() > max_seen_cache_len {
// we don't want to keep too many entries in here.
seen = seen.split_off(seen.len() - KEEP_SEEN_CACHE_SIZE);
seen = seen.split_off(seen.len() - keep_seen_cache_size);
}
continue
},
Expand Down Expand Up @@ -919,10 +890,16 @@ pub async fn handle_changes(

// drop old items when the queue is full.
if queue.len() >= max_queue_len {
let dropped = queue.pop_front();
if let Some(dropped) = dropped {
for v in dropped.0.versions() {
let _ = seen.remove(&(dropped.0.actor_id, v));
if let Some((dropped_change, _, _)) = queue.pop_front() {
for v in dropped_change.versions() {
if let Entry::Occupied(mut entry) = seen.entry((change.actor_id, v)) {
if let Some(seqs) = dropped_change.seqs().cloned() {
entry.get_mut().remove(seqs);
} else {
entry.remove_entry();
}
};
buf_cost -= dropped_change.processing_cost();
}
}

Expand Down
122 changes: 121 additions & 1 deletion crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use tokio::{
};
use tracing::{debug, info_span};
use tripwire::Tripwire;
use uuid::Uuid;

use crate::{
agent::process_multiple_changes,
Expand All @@ -31,7 +32,6 @@ use crate::{
transport::Transport,
};
use corro_tests::*;
use corro_types::agent::Agent;
use corro_types::broadcast::Timestamp;
use corro_types::change::Change;
use corro_types::{
Expand All @@ -44,6 +44,11 @@ use corro_types::{
sqlite::CrConn,
sync::generate_sync,
};
use corro_types::{
agent::Agent,
api::{ColumnName, TableName},
pubsub::pack_columns,
};

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn insert_rows_and_gossip() -> eyre::Result<()> {
Expand Down Expand Up @@ -903,6 +908,121 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn process_failed_changes() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();

let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let uuid = Uuid::parse_str("00000000-0000-0000-a716-446655440000")?;
let actor_id = ActorId(uuid);
// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Extension(ta1.agent.clone()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
assert_eq!(status_code, StatusCode::OK);

let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let (status_code, _body) = api_v1_db_schema(
Extension(ta2.agent.clone()),
axum::Json(vec![corro_tests::TEST_SCHEMA.into()]),
)
.await;
assert_eq!(status_code, StatusCode::OK);

for i in 1..=5_i64 {
let (status_code, _) = api_v1_transactions(
Extension(ta2.agent.clone()),
axum::Json(vec![Statement::WithParams(
"INSERT OR REPLACE INTO tests (id,text) VALUES (?,?)".into(),
vec![i.into(), "service-text".into()],
)]),
)
.await;
assert_eq!(status_code, StatusCode::OK);
}
let mut good_changes = get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;

let change6 = Change {
table: TableName("tests".into()),
pk: pack_columns(&vec![6i64.into()])?,
cid: ColumnName("text".into()),
val: "six".into(),
col_version: 1,
db_version: CrsqlDbVersion(6),
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
};

let bad_change = Change {
table: TableName("tests".into()),
pk: pack_columns(&vec![6i64.into()])?,
cid: ColumnName("nonexistent".into()),
val: "six".into(),
col_version: 1,
db_version: CrsqlDbVersion(6),
seq: CrsqlSeq(1),
site_id: actor_id.to_bytes(),
cl: 1,
};

let mut rows = vec![
(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
},
ChangeSource::Sync,
Instant::now(),
)
];

rows.append(&mut good_changes);

let res = process_multiple_changes(ta1.agent.clone(), ta1.bookie.clone(), rows).await;

assert!(res.is_ok());

// verify that correct versions were inserted
let conn = ta1.agent.pool().read().await?;

for i in 1..=5_i64 {
let pk = pack_columns(&[i.into()])?;
let crsql_dbv = conn.prepare_cached(r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#)?
.query_row([pk], |row| row.get::<_, CrsqlDbVersion>(0))?;

let booked_dbv = conn.prepare_cached("SELECT db_version from __corro_bookkeeping where start_version = ? and actor_id = ?")?
.query_row((i, ta2.agent.actor_id()), |row| row.get::<_, CrsqlDbVersion>(0))?;

assert_eq!(crsql_dbv, booked_dbv);

let conn = ta1.agent.pool().read().await?;
conn.prepare_cached("SELECT text from tests where id = ?")?
.query_row([i], |row| row.get::<_, String>(0))?;
}

let res = conn
.prepare_cached("SELECT text from tests where id = 6")?
.query_row([], |row| row.get::<_, String>(0));
assert!(res.is_err());
assert_eq!(res, Err(rusqlite::Error::QueryReturnedNoRows));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also check that the db_version inserted and the ones Corrosion associated with its internal version are correct? Possibly by trying to apply the bad change first.

I vaguely recall we do some db version arithmetics and we might need to subtract when we're rolling back a savepoint.

tripwire_tx.send(()).await.ok();
tripwire_worker.await;
wait_for_all_pending_handles().await;

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_process_multiple_changes() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
Expand Down
Loading
Loading