Skip to content

Commit

Permalink
merge private main
Browse files Browse the repository at this point in the history
  • Loading branch information
josehu07 committed Apr 29, 2024
1 parent 5187a43 commit 1b56ac6
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 1,078 deletions.
768 changes: 379 additions & 389 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 0 additions & 6 deletions scripts/local_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@
"CRaft": lambda n, _: f"fault_tolerance={(n//2)//2}",
}

PROTOCOL_SNAPSHOT_PATH = {
"MultiPaxos": lambda r: f"snapshot_path='/tmp/summerset.multipaxos.{r}.snap'",
"Raft": lambda r: f"snapshot_path='/tmp/summerset.raft.{r}.snap'",
"RSPaxos": lambda r: f"snapshot_path='/tmp/summerset.rs_paxos.{r}.snap'",
}


def config_with_file_paths(protocol, config, replica):
result_config = PROTOCOL_BACKER_PATH[protocol](replica)
Expand Down
8 changes: 0 additions & 8 deletions src/manager/clusman.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ pub struct ClusterManager {
/// ServerReigner module.
server_reigner: ServerReigner,

/// Receiver side of the server ID assignment channel.
rx_id_assign: mpsc::UnboundedReceiver<()>,

/// Sender side of the server ID assignment result channel.
tx_id_result: mpsc::UnboundedSender<(ReplicaId, u8)>,

/// ClientReactor module.
client_reactor: ClientReactor,
}
Expand Down Expand Up @@ -105,8 +99,6 @@ impl ClusterManager {
servers_info: HashMap::new(),
assigned_ids: HashSet::new(),
server_reigner,
rx_id_assign,
tx_id_result,
client_reactor,
})
}
Expand Down
106 changes: 0 additions & 106 deletions src/manager/reigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,110 +799,4 @@ mod reigner_tests {
)?;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn api_server_leave() -> Result<(), SummersetError> {
let barrier = Arc::new(Barrier::new(2));
let barrier2 = barrier.clone();
tokio::spawn(async move {
// replica 0
barrier2.wait().await;
let mut hub =
ControlHub::new_and_setup("127.0.0.1:54600".parse()?).await?;
assert_eq!(hub.me, 0);
// send a message to manager
hub.send_ctrl(CtrlMsg::NewServerJoin {
id: hub.me,
protocol: SmrProtocol::SimplePush,
api_addr: "127.0.0.1:54700".parse()?,
p2p_addr: "127.0.0.1:54800".parse()?,
})?;
// recv a message from manager
assert_eq!(
hub.recv_ctrl().await?,
CtrlMsg::ConnectToPeers {
population: 1,
to_peers: HashMap::new(),
}
);
// leave and re-join as 0
hub.send_ctrl(CtrlMsg::Leave)?;
assert_eq!(hub.recv_ctrl().await?, CtrlMsg::LeaveReply);
time::sleep(Duration::from_millis(100)).await;
let mut hub =
ControlHub::new_and_setup("127.0.0.1:54600".parse()?).await?;
assert_eq!(hub.me, 0);
// send a message to manager
hub.send_ctrl(CtrlMsg::NewServerJoin {
id: hub.me,
protocol: SmrProtocol::SimplePush,
api_addr: "127.0.0.1:54700".parse()?,
p2p_addr: "127.0.0.1:54800".parse()?,
})?;
// recv a message from manager
assert_eq!(
hub.recv_ctrl().await?,
CtrlMsg::ConnectToPeers {
population: 1,
to_peers: HashMap::new(),
}
);
Ok::<(), SummersetError>(())
});
// manager
let (tx_id_assign, mut rx_id_assign) = mpsc::unbounded_channel();
let (tx_id_result, rx_id_result) = mpsc::unbounded_channel();
let mut reigner = ServerReigner::new_and_setup(
"127.0.0.1:54600".parse()?,
tx_id_assign,
rx_id_result,
)
.await?;
barrier.wait().await;
// recv message from server 0
rx_id_assign.recv().await;
tx_id_result.send((0, 1))?;
let (id, msg) = reigner.recv_ctrl().await?;
assert_eq!(id, 0);
assert_eq!(
msg,
CtrlMsg::NewServerJoin {
id: 0,
protocol: SmrProtocol::SimplePush,
api_addr: "127.0.0.1:54700".parse()?,
p2p_addr: "127.0.0.1:54800".parse()?
}
);
// send reply to server 0
reigner.send_ctrl(
CtrlMsg::ConnectToPeers {
population: 1,
to_peers: HashMap::new(),
},
id,
)?;
rx_id_assign.recv().await;
tx_id_result.send((0, 1))?;
// recv message from server 0
let (id, msg) = reigner.recv_ctrl().await?;
assert_eq!(id, 0);
assert_eq!(
msg,
CtrlMsg::NewServerJoin {
id: 0,
protocol: SmrProtocol::SimplePush,
api_addr: "127.0.0.1:54700".parse()?,
p2p_addr: "127.0.0.1:54800".parse()?
}
);
// send reply to server 0
reigner.send_ctrl(
CtrlMsg::ConnectToPeers {
population: 1,
to_peers: HashMap::new(),
},
id,
)?;
Ok(())
}
}
8 changes: 0 additions & 8 deletions src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,6 @@ impl SmrProtocol {
.await
)
}
Self::Raft => {
box_if_ok!(
RaftReplica::new_and_setup(
api_addr, p2p_addr, manager, config_str
)
.await
)
}
Self::RSPaxos => {
box_if_ok!(
RSPaxosReplica::new_and_setup(
Expand Down
36 changes: 3 additions & 33 deletions src/server/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::fmt;
use std::path::Path;
use std::io::SeekFrom;
use std::sync::Arc;

use crate::utils::SummersetError;
use crate::server::ReplicaId;
Expand All @@ -19,7 +18,6 @@ use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncSeekExt};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration};

/// Log action ID type.
pub type LogActionId = u64;
Expand Down Expand Up @@ -108,7 +106,6 @@ where
pub async fn new_and_setup(
me: ReplicaId,
path: &Path,
perf_a_b: Option<(u64, u64)>, // performance simulation params
) -> Result<Self, SummersetError> {
// prepare backing file
if !fs::try_exists(path).await? {
Expand All @@ -125,35 +122,8 @@ where
mpsc::unbounded_channel::<(LogActionId, LogAction<Ent>)>();
let (tx_ack, rx_ack) = mpsc::unbounded_channel();

// if doing performance delay simulation, add on-the-fly delay to
// each message received
let rx_log_true = if let Some((perf_a, perf_b)) = perf_a_b {
let (tx_log_delayed, rx_log_delayed) = mpsc::unbounded_channel();
let tx_log_delayed_arc = Arc::new(tx_log_delayed);

tokio::spawn(async move {
while let Some((id, log_action)) = rx_log.recv().await {
let tx_log_delayed_clone = tx_log_delayed_arc.clone();
tokio::spawn(async move {
let approx_size = log_action.get_size() as u64;
let delay_ns = perf_a + approx_size * perf_b;
time::sleep(Duration::from_nanos(delay_ns)).await;
tx_log_delayed_clone.send((id, log_action)).unwrap();
});
}
});

rx_log_delayed
} else {
rx_log
};

let logger_handle = tokio::spawn(Self::logger_thread(
me,
backer_file,
rx_log_true,
tx_ack,
));
let logger_handle =
tokio::spawn(Self::logger_thread(me, backer_file, rx_log, tx_ack));

Ok(StorageHub {
me,
Expand Down Expand Up @@ -773,7 +743,7 @@ mod storage_tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn api_log_ack() -> Result<(), SummersetError> {
let path = Path::new("/tmp/test-backer-6.log");
let mut hub = StorageHub::new_and_setup(0, path, None).await?;
let mut hub = StorageHub::new_and_setup(0, path).await?;
let entry = TestEntry("abcdefgh".into());
let entry_bytes = encode_to_vec(&entry)?;
hub.submit_action(0, LogAction::Append { entry, sync: true })?;
Expand Down
4 changes: 1 addition & 3 deletions src/server/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

use std::fmt;
use std::net::SocketAddr;
use std::sync::Arc;

use crate::utils::{
SummersetError, Bitmap, safe_tcp_read, safe_tcp_write, tcp_bind_with_retry,
Expand Down Expand Up @@ -95,7 +94,6 @@ where
me: ReplicaId,
population: u8,
p2p_addr: SocketAddr,
perf_a_b: Option<(u64, u64)>, // performance simulation params
) -> Result<Self, SummersetError> {
if population <= me {
return logged_err!(me; "invalid population {}", population);
Expand Down Expand Up @@ -131,7 +129,7 @@ where
Ok(TransportHub {
me,
population,
rx_recv: rx_recv_true,
rx_recv,
tx_sends: tx_sends_read,
_peer_acceptor_handle: peer_acceptor_handle,
tx_connect,
Expand Down
Loading

0 comments on commit 1b56ac6

Please sign in to comment.