Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Nov 2, 2023
1 parent 67b29ed commit 0832376
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/connectors/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) use tremor_common::{
};
pub(crate) use tremor_config::Impl;
pub use tremor_config::NameWithConfig;
pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID};
pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID};
pub(crate) use tremor_script::prelude::*;
/// default buf size used for reading from files and streams (sockets etc)
///
Expand Down
38 changes: 38 additions & 0 deletions src/raft/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ impl Store {
where
T: for<'de> serde::Deserialize<'de>,
{
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(STORE).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(STORE).map_err(r_err)?;
let r = table.get(key).map_err(r_err)?;
Expand Down Expand Up @@ -528,6 +535,13 @@ impl RaftLogReader<TremorRaftConfig> for Store {
&mut self,
range: RB,
) -> StorageResult<Vec<Entry<TremorRaftConfig>>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(LOGS).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(LOGS).map_err(r_err)?;
Expand Down Expand Up @@ -770,6 +784,14 @@ impl RaftStorage<TremorRaftConfig> for Store {
}

async fn get_log_state(&mut self) -> StorageResult<LogState<TremorRaftConfig>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(LOGS).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(LOGS).map_err(r_err)?;
let last = table
Expand Down Expand Up @@ -894,6 +916,14 @@ impl Store {
/// # Errors
/// if the store fails to read the RPC address
pub fn get_self_addr(db: &Database) -> Result<Option<Addr>, Error> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
let _ = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand All @@ -904,6 +934,14 @@ impl Store {
/// # Errors
/// if the store fails to read the node id
pub fn get_self_node_id(db: &Database) -> Result<Option<crate::raft::NodeId>, Error> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
let _ = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand Down
8 changes: 8 additions & 0 deletions src/raft/store/statemachine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ impl TremorStateMachine {
where
T: for<'de> serde::Deserialize<'de>,
{
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(STATE_MACHINE).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = self.db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(STATE_MACHINE).map_err(r_err)?;
Expand Down
18 changes: 17 additions & 1 deletion src/raft/store/statemachine/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
apps: HashMap::new(),
world: world.clone(),
};
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write()?;
{
let _ = bug_fix_txn.open_table(APPS).map_err(w_err)?;
let _ = bug_fix_txn.open_table(INSTANCES).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_txn = db.begin_read()?;
let apps = read_txn.open_table(APPS)?;
Expand All @@ -99,6 +107,7 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
me.load_archive(archive.value())
.map_err(|e| store::Error::Other(Box::new(e)))?;
}

let instances = read_txn.open_table(INSTANCES)?;

// load instances
Expand Down Expand Up @@ -220,6 +229,13 @@ impl RaftStateMachine<AppsSnapshot, AppsRequest> for AppsStateMachine {
}

fn as_snapshot(&self) -> StorageResult<AppsSnapshot> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(APPS).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let apps = read_txn.open_table(APPS).map_err(r_err)?;
let archives = apps
Expand Down Expand Up @@ -300,7 +316,7 @@ impl AppsStateMachine {
}
write_txn.commit().map_err(w_err)?;

let app = StateApp {
let app: StateApp = StateApp {
app,
main,
arena_indices,
Expand Down
15 changes: 15 additions & 0 deletions src/raft/store/statemachine/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ impl KvStateMachine {
/// try to obtain the value at the given `key`.
/// Returns `Ok(None)` if there is no value for that key.
pub(crate) fn get(&self, key: &str) -> StorageResult<Option<Vec<u8>>> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(DATA).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = self.db.begin_read().map_err(r_err)?;
let table = read_txn.open_table(DATA).map_err(r_err)?;
table
Expand Down Expand Up @@ -85,6 +92,14 @@ impl RaftStateMachine<KvSnapshot, KvRequest> for KvStateMachine {
}

fn as_snapshot(&self) -> StorageResult<KvSnapshot> {
// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = self.db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(DATA).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;

let read_tnx = self.db.begin_read().map_err(w_err)?;
let table = read_tnx.open_table(DATA).map_err(w_err)?;
let data = table
Expand Down
10 changes: 10 additions & 0 deletions src/raft/store/statemachine/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ impl RaftStateMachine<NodesSnapshot, NodesRequest> for NodesStateMachine {
Self: std::marker::Sized,
{
// load known nodes

// We need to use a write transaction despite just wanting a read transaction due to
// https://github.com/cberner/redb/issues/711
let bug_fix_txn = db.begin_write().map_err(w_err)?;
{
let _ = bug_fix_txn.open_table(SYSTEM).map_err(w_err)?;
let _ = bug_fix_txn.open_table(NODES).map_err(w_err)?;
}
bug_fix_txn.commit().map_err(w_err)?;
let read_txn = db.begin_read().map_err(r_err)?;

let table = read_txn.open_table(SYSTEM).map_err(r_err)?;
Expand All @@ -66,6 +75,7 @@ impl RaftStateMachine<NodesSnapshot, NodesRequest> for NodesStateMachine {
debug!("No next_node_id stored in db, starting from 0");
0
};

let table = read_txn.open_table(NODES).map_err(r_err)?;

let known_nodes = table
Expand Down
14 changes: 7 additions & 7 deletions src/raft/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ async fn cluster_join_test() -> ClusterResult<()> {
let dir0 = tempfile::tempdir()?;
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;

// all see the same leader
let client0 = node0.client();
Expand Down Expand Up @@ -162,9 +162,9 @@ async fn kill_and_restart_voter() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;

let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;

let client0 = node0.client();
let metrics = client0.metrics().await?;
Expand All @@ -181,7 +181,7 @@ async fn kill_and_restart_voter() -> ClusterResult<()> {
tokio::time::sleep(Duration::from_millis(500)).await;

// restart the node
let node1 = TestNode::just_start(dir1.path()).await?;
let node1 = TestNode::just_start(dir1.path().join("db")).await?;

// check that the leader is available
// TODO: solidify to guard against timing issues
Expand Down
16 changes: 8 additions & 8 deletions src/raft/test/learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ async fn add_learner_test() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let dir3 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;
let client0 = node0.client();
let metrics = client0.metrics().await?;
let members = metrics
Expand All @@ -38,7 +38,7 @@ async fn add_learner_test() -> ClusterResult<()> {
.expect("No nodes in membership config");
assert_eq!(3, members.len());

let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?;
let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?;
let (learner_node_id, learner_addr) = learner_node.running.node_data();
// learner is known to the cluster
let nodemap = client0.get_nodes().await?;
Expand Down Expand Up @@ -82,9 +82,9 @@ async fn learner_runs_app() -> ClusterResult<()> {
let dir1 = tempfile::tempdir()?;
let dir2 = tempfile::tempdir()?;
let dir3 = tempfile::tempdir()?;
let node0 = TestNode::bootstrap(dir0.path()).await?;
let node1 = TestNode::start_and_join(dir1.path(), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path(), &node1.addr).await?;
let node0 = TestNode::bootstrap(dir0.path().join("db")).await?;
let node1 = TestNode::start_and_join(dir1.path().join("db"), &node0.addr).await?;
let node2 = TestNode::start_and_join(dir2.path().join("db"), &node1.addr).await?;
let client0 = node0.client();
let metrics = client0.metrics().await?;
let members = metrics
Expand All @@ -95,7 +95,7 @@ async fn learner_runs_app() -> ClusterResult<()> {
.expect("No nodes in membership config");
assert_eq!(3, members.len());

let learner_node = TestNode::join_as_learner(dir3.path(), &node0.addr).await?;
let learner_node = TestNode::join_as_learner(dir3.path().join("db"), &node0.addr).await?;
let (_learner_node_id, _learner_addr) = learner_node.running.node_data();
let tmpfile = tempfile::NamedTempFile::new()?;
let out_path = tmpfile.into_temp_path();
Expand Down
2 changes: 1 addition & 1 deletion tremor-value/src/known_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use beef::Cow;
use halfbrown::RawEntryMut;
use simd_json::ObjectHasher;
use std::fmt;
use std::hash::{BuildHasher};
use std::hash::BuildHasher;
use value_trait::prelude::*;

/// Well known key that can be looked up in a `Value` faster.
Expand Down

0 comments on commit 0832376

Please sign in to comment.