Skip to content

Commit

Permalink
it's not backwards-compatible to add site_version to the ChangeV1's c…
Browse files Browse the repository at this point in the history
…hangeset. removed and we'll rely on the ChangeV1's version as site_version
  • Loading branch information
jeromegn committed Dec 16, 2024
1 parent 77b5a79 commit c4e082d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 27 deletions.
1 change: 0 additions & 1 deletion crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: agent.actor_id().to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let change = (
Expand Down
2 changes: 0 additions & 2 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ async fn process_failed_changes() -> eyre::Result<()> {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let bad_change = Change {
Expand All @@ -939,7 +938,6 @@ async fn process_failed_changes() -> eyre::Result<()> {
seq: CrsqlSeq(1),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let mut rows = vec![(
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ pub fn process_complete_version(
change.cl,
// increment the seq by the start_seq or else we'll have multiple change rows with the same seq
change.seq,
change.site_version,
version,
])?;
let rows_impacted: i64 = sp
.prepare_cached("SELECT crsql_rows_impacted()")?
Expand Down
50 changes: 34 additions & 16 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use futures::stream::FuturesUnordered;
use futures::{Future, Stream, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use metrics::counter;
use quinn::{RecvStream, SendStream};
use quinn::{RecvStream, SendStream, WriteError};
use rangemap::RangeInclusiveSet;
use rusqlite::{named_params, Connection};
use speedy::Writable;
Expand All @@ -48,6 +48,8 @@ pub enum SyncError {
#[error(transparent)]
Send(#[from] SyncSendError),
#[error(transparent)]
BiPayloadSend(#[from] BiPayloadSendError),
#[error(transparent)]
Recv(#[from] SyncRecvError),
#[error(transparent)]
Rejection(#[from] SyncRejectionV1),
Expand Down Expand Up @@ -94,6 +96,24 @@ pub enum SyncSendError {
ChannelClosed,
}

#[derive(Debug, thiserror::Error)]
pub enum BiPayloadSendError {
#[error("could not encode payload: {0}")]
Encode(#[from] BiPayloadEncodeError),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Write(#[from] quinn::WriteError),
}

#[derive(Debug, thiserror::Error)]
pub enum BiPayloadEncodeError {
#[error(transparent)]
Encode(#[from] speedy::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}

fn build_quinn_transport_config(config: &GossipConfig) -> quinn::TransportConfig {
let mut transport_config = quinn::TransportConfig::default();

Expand Down Expand Up @@ -899,20 +919,21 @@ async fn encode_write_bipayload_msg(
send_buf: &mut BytesMut,
msg: BiPayload,
write: &mut SendStream,
) -> Result<(), SyncSendError> {
) -> Result<(), BiPayloadSendError> {
encode_bipayload_msg(codec, encode_buf, send_buf, msg)?;

write_buf(send_buf, write).await
write_buf(send_buf, write)
.await
.map_err(BiPayloadSendError::from)
}

fn encode_bipayload_msg(
codec: &mut LengthDelimitedCodec,
encode_buf: &mut BytesMut,
send_buf: &mut BytesMut,
msg: BiPayload,
) -> Result<(), SyncSendError> {
msg.write_to_stream(encode_buf.writer())
.map_err(SyncMessageEncodeError::from)?;
) -> Result<(), BiPayloadEncodeError> {
msg.write_to_stream(encode_buf.writer())?;

codec.encode(encode_buf.split().freeze(), send_buf)?;
Ok(())
Expand All @@ -927,11 +948,13 @@ async fn encode_write_sync_msg(
) -> Result<(), SyncSendError> {
encode_sync_msg(codec, encode_buf, send_buf, msg)?;

write_buf(send_buf, write).await
write_buf(send_buf, write)
.await
.map_err(SyncSendError::from)
}

#[tracing::instrument(skip_all, fields(buf_size = send_buf.len()), err)]
async fn write_buf(send_buf: &mut BytesMut, write: &mut SendStream) -> Result<(), SyncSendError> {
async fn write_buf(send_buf: &mut BytesMut, write: &mut SendStream) -> Result<(), WriteError> {
let len = send_buf.len();
write.write_chunk(send_buf.split().freeze()).await?;
counter!("corro.sync.chunk.sent.bytes").increment(len as u64);
Expand Down Expand Up @@ -1529,7 +1552,7 @@ pub async fn serve_sync(
encode_sync_msg(&mut codec, &mut encode_buf, &mut send_buf, msg)?;

if send_buf.len() >= 16 * 1024 {
write_buf(&mut send_buf, &mut write).await?;
write_buf(&mut send_buf, &mut write).await.map_err(SyncSendError::from)?;
}
},
None => {
Expand All @@ -1539,15 +1562,15 @@ pub async fn serve_sync(

_ = check_buf.tick() => {
if !send_buf.is_empty() {
write_buf(&mut send_buf, &mut write).await?;
write_buf(&mut send_buf, &mut write).await.map_err(SyncSendError::from)?;
}
}
}
}

if !stopped {
if !send_buf.is_empty() {
write_buf(&mut send_buf, &mut write).await?;
write_buf(&mut send_buf, &mut write).await.map_err(SyncSendError::from)?;
}

if let Err(e) = write.finish().await {
Expand Down Expand Up @@ -1739,7 +1762,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let change2 = Change {
Expand All @@ -1752,7 +1774,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(2),
};

let bookie = Bookie::new(Default::default());
Expand Down Expand Up @@ -1890,7 +1911,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(3),
};

process_multiple_changes(
Expand Down Expand Up @@ -2035,7 +2055,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(4),
};

process_multiple_changes(
Expand Down Expand Up @@ -2091,7 +2110,6 @@ mod tests {
seq: CrsqlSeq(last_seq),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(5),
};
last_seq += 1;
c
Expand Down
4 changes: 0 additions & 4 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let change2 = Change {
Expand All @@ -1356,7 +1355,6 @@ mod tests {
seq: CrsqlSeq(1),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let changes = ChangeV1 {
Expand Down Expand Up @@ -1428,7 +1426,6 @@ mod tests {
seq: CrsqlSeq(0),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let changes = ChangeV1 {
Expand Down Expand Up @@ -1470,7 +1467,6 @@ mod tests {
seq: CrsqlSeq(1),
site_id: actor_id.to_bytes(),
cl: 1,
site_version: CrsqlSiteVersion(1),
};

let changes = ChangeV1 {
Expand Down
2 changes: 0 additions & 2 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ pub struct Change {
pub seq: CrsqlSeq,
pub site_id: [u8; 16],
pub cl: i64,
pub site_version: CrsqlSiteVersion,
}

impl Change {
Expand Down Expand Up @@ -276,7 +275,6 @@ pub fn row_to_change(row: &Row) -> Result<Change, rusqlite::Error> {
seq: row.get(6)?,
site_id: row.get(7)?,
cl: row.get(8)?,
site_version: row.get(9)?,
})
}

Expand Down
1 change: 0 additions & 1 deletion crates/corro-types/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,6 @@ mod tests {
&change.site_id,
change.cl,
change.seq,
change.site_version,
])
.unwrap();
}
Expand Down

0 comments on commit c4e082d

Please sign in to comment.