Skip to content

Commit

Permalink
add follow peer API to allow nodes to follow each other, useful for s…
Browse files Browse the repository at this point in the history
…eparate clusters
  • Loading branch information
jeromegn committed Dec 12, 2024
1 parent e66f039 commit f4d94d3
Show file tree
Hide file tree
Showing 11 changed files with 521 additions and 34 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/corro-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ tracing = { workspace = true }
tripwire = { path = "../tripwire" }
rangemap = { workspace = true }
uuid = { workspace = true }
bytes = { workspace = true }
156 changes: 153 additions & 3 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use std::{
fmt::Display,
net::SocketAddr,
time::{Duration, Instant},
};

use bytes::BytesMut;
use camino::Utf8PathBuf;
use corro_agent::{
api::peer::{
encode_write_bipayload_msg,
follow::{read_follow_msg, FollowMessage, FollowMessageV1},
},
transport::Transport,
};
use corro_types::{
actor::{ActorId, ClusterId},
agent::{Agent, BookedVersions, Bookie, LockKind, LockMeta, LockState},
api::SqliteValueRef,
base::{CrsqlDbVersion, CrsqlSeq, Version},
broadcast::{FocaCmd, FocaInput, Timestamp},
broadcast::{BiPayload, Changeset, FocaCmd, FocaInput, Timestamp},
pubsub::unpack_columns,
sqlite::SqlitePoolError,
sync::generate_sync,
updates::Handle,
Expand All @@ -25,7 +36,7 @@ use tokio::{
task::block_in_place,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tracing::{debug, error, info, warn};
use tripwire::Tripwire;
use uuid::Uuid;
Expand All @@ -45,6 +56,7 @@ pub struct AdminConfig {
pub fn start_server(
agent: Agent,
bookie: Bookie,
transport: Transport,
config: AdminConfig,
mut tripwire: Tripwire,
) -> Result<(), AdminError> {
Expand Down Expand Up @@ -78,8 +90,9 @@ pub fn start_server(
let agent = agent.clone();
let bookie = bookie.clone();
let config = config.clone();
let transport = transport.clone();
async move {
if let Err(e) = handle_conn(agent, &bookie, config, stream).await {
if let Err(e) = handle_conn(agent, &bookie, &transport, config, stream).await {
error!("could not handle admin connection: {e}");
}
}
Expand All @@ -99,6 +112,16 @@ pub enum Command {
Cluster(ClusterCommand),
Actor(ActorCommand),
Subs(SubsCommand),
Debug(DebugCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DebugCommand {
Follow {
peer_addr: SocketAddr,
from: Option<u64>,
local_only: bool,
},
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -291,6 +314,7 @@ async fn collapse_gaps(
async fn handle_conn(
agent: Agent,
bookie: &Bookie,
transport: &Transport,
_config: AdminConfig,
stream: UnixStream,
) -> Result<(), AdminError> {
Expand Down Expand Up @@ -598,6 +622,132 @@ async fn handle_conn(
}
};
}
Command::Debug(DebugCommand::Follow {
peer_addr,
from,
local_only,
}) => match transport.open_bi(peer_addr).await {
Ok((mut tx, recv)) => {
let mut codec = LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec();
let mut encoding_buf = BytesMut::new();
let mut buf = BytesMut::new();

if let Err(e) = encode_write_bipayload_msg(
&mut codec,
&mut encoding_buf,
&mut buf,
BiPayload::V1 {
data: corro_types::broadcast::BiPayloadV1::Follow {
from: from.map(CrsqlDbVersion),
local_only,
},
cluster_id: agent.cluster_id(),
},
&mut tx,
)
.await
{
send_error(
&mut stream,
format!("could not send follow payload to {peer_addr}: {e}"),
)
.await;
continue;
}

let mut framed = FramedRead::new(
recv,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);

'msg: loop {
match read_follow_msg(&mut framed).await {
Ok(None) => {
send_success(&mut stream).await;
break;
}
Err(e) => {
send_error(
&mut stream,
format!("error receiving follow message: {e}"),
)
.await;
break;
}
Ok(Some(msg)) => {
match msg {
FollowMessage::V1(FollowMessageV1::Change(change)) => {
let actor_id = change.actor_id;
match change.changeset {
Changeset::Full {
version,
changes,
ts,
..
} => {
if let Err(e) = stream
.send(Response::Json(serde_json::json!({
"actor_id": actor_id,
"type": "full",
"version": version,
"ts": ts.to_string(),
})))
.await
{
warn!("could not send to steam, breaking ({e})");
break;
}

for change in changes {
if let Err(e) = stream.send(
Response::Json(
serde_json::json!({
"table": change.table,
"pk": unpack_columns(&change.pk).unwrap().iter().map(SqliteValueRef::to_owned).collect::<Vec<_>>(),
"cid": change.cid,
"val": change.val,
"col_version": change.col_version,
"db_version": change.db_version,
"seq": change.seq,
"site_id": ActorId::from_bytes(change.site_id),
"cl": change.cl,
}),
),
)
.await {
warn!("could not send to steam, breaking ({e})");
break 'msg;
}
}
}
changeset => {
send_log(
&mut stream,
LogLevel::Warn,
format!("unknown change type received: {changeset:?}"),
)
.await;
}
}
}
}
}
}
}
}
Err(e) => {
send_error(
&mut stream,
format!("could not open bi-directional stream with {peer_addr}: {e}"),
)
.await;
continue;
}
},
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
61 changes: 37 additions & 24 deletions crates/corro-agent/src/agent/bi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::api::peer::serve_sync;
use crate::api::peer::{follow::serve_follow, serve_sync};
use corro_types::{
agent::{Agent, Bookie},
broadcast::{BiPayload, BiPayloadV1},
Expand Down Expand Up @@ -56,7 +56,12 @@ pub fn spawn_bipayload_handler(
let agent = agent.clone();
let bookie = bookie.clone();
async move {
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec());
let mut framed = FramedRead::new(
rx,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);

loop {
match timeout(Duration::from_secs(5), StreamExt::next(&mut framed)).await {
Expand All @@ -72,30 +77,38 @@ pub fn spawn_bipayload_handler(
match BiPayload::read_from_buffer(&b) {
Ok(payload) => {
match payload {
BiPayload::V1 {
data:
BiPayloadV1::SyncStart {
actor_id,
trace_ctx,
},
cluster_id,
} => {
trace!(
"framed read buffer len: {}",
framed.read_buffer().len()
);
BiPayload::V1 { data, cluster_id } => match data {
BiPayloadV1::SyncStart {
actor_id,
trace_ctx,
} => {
trace!(
"framed read buffer len: {}",
framed.read_buffer().len()
);

// println!("got sync state: {state:?}");
if let Err(e) = serve_sync(
&agent, &bookie, actor_id, trace_ctx,
cluster_id, framed, tx,
)
.await
{
warn!("could not complete receiving sync: {e}");
// println!("got sync state: {state:?}");
if let Err(e) = serve_sync(
&agent, &bookie, actor_id, trace_ctx,
cluster_id, framed, tx,
)
.await
{
warn!("could not complete receiving sync: {e}");
}
break;
}
break;
}
BiPayloadV1::Follow { from, local_only } => {
if let Err(e) = serve_follow(
&agent, from, local_only, tx,
)
.await
{
warn!("could not complete follow: {e}");
}
break;
}
},
}
}

Expand Down
9 changes: 7 additions & 2 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
metrics, setup, util, AgentOptions,
},
broadcast::runtime_loop,
transport::Transport,
};
use corro_types::{
actor::ActorId,
Expand All @@ -26,12 +27,16 @@ use tripwire::Tripwire;
///
/// First initialise `AgentOptions` state via `setup()`, then spawn a
/// new task that runs the main agent state machine
pub async fn start_with_config(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Bookie)> {
pub async fn start_with_config(
conf: Config,
tripwire: Tripwire,
) -> eyre::Result<(Agent, Bookie, Transport)> {
let (agent, opts) = setup(conf.clone(), tripwire.clone()).await?;
let transport = opts.transport.clone();

let bookie = run(agent.clone(), opts, conf.perf).await?;

Ok((agent, bookie))
Ok((agent, bookie, transport))
}

async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Result<Bookie> {
Expand Down
Loading

0 comments on commit f4d94d3

Please sign in to comment.