Skip to content

Commit

Permalink
start ack loop with an first ack, remove unecessary steps elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Jul 23, 2023
1 parent 1e4c08e commit 58a7783
Showing 1 changed file with 49 additions and 44 deletions.
93 changes: 49 additions & 44 deletions examples/distributed_bakery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,32 @@ async fn increment_output(doc_handle: &DocHandle, customer_id: &str) -> Result<u
}

async fn run_bakery_algorithm(doc_handle: &DocHandle, customer_id: &String) {
loop {
let (our_number, closing) = doc_handle.with_doc_mut(|doc| {
// At the start of each iteration,
// pick a number that is higher than all others.
// This means picking a new, and higher,
// number than at the last iteration,
// if others have chosen new ones since last try.
let mut bakery: Bakery = hydrate(doc).unwrap();
let customers_with_number = bakery
.customers
.clone()
.iter()
.map(|(id, c)| (id.clone(), c.number))
.collect();
let highest_number = bakery.customers.values().map(|c| c.number).max().unwrap();
let our_number = highest_number + 1;
let our_info = bakery.customers.get_mut(customer_id).unwrap();
our_info.views_of_others = customers_with_number;
our_info.number = our_number;
let mut tx = doc.transaction();
reconcile(&mut tx, &bakery).unwrap();
tx.commit();
(our_number, bakery.closing)
});

if closing {
return;
}
let (our_number, closing) = doc_handle.with_doc_mut(|doc| {
// At the start of the algorithm,
// pick a number that is higher than all others.
let mut bakery: Bakery = hydrate(doc).unwrap();
let customers_with_number = bakery
.customers
.clone()
.iter()
.map(|(id, c)| (id.clone(), c.number))
.collect();
let highest_number = bakery.customers.values().map(|c| c.number).max().unwrap();
let our_number = highest_number + 1;
let our_info = bakery.customers.get_mut(customer_id).unwrap();
our_info.views_of_others = customers_with_number;
our_info.number = our_number;
let mut tx = doc.transaction();
reconcile(&mut tx, &bakery).unwrap();
tx.commit();
(our_number, bakery.closing)
});

if closing {
return;
}
loop {
//println!("Start bakery loop");
doc_handle.changed().await.unwrap();

// Perform reads outside of closure,
Expand All @@ -103,6 +100,8 @@ async fn run_bakery_algorithm(doc_handle: &DocHandle, customer_id: &String) {
return;
}

//println!("Bakery customers: {:?}", bakery.customers);

// Wait for all peers to have acknowlegded our number.
let acked_by_all = bakery
.customers
Expand Down Expand Up @@ -157,15 +156,27 @@ async fn run_bakery_algorithm(doc_handle: &DocHandle, customer_id: &String) {
}

async fn acknowlegde_changes(doc_handle: DocHandle, customer_id: String) {
let (mut our_view, mut output_seen, closing) = doc_handle.with_doc(|doc| {
let bakery: Bakery = hydrate(doc).unwrap();
let our_info = bakery.customers.get(&customer_id).unwrap();
let output_seen = bakery.output_seen.get(&customer_id).unwrap();
(
our_info.views_of_others.clone(),
*output_seen,
bakery.closing,
)
let (mut our_view, mut output_seen, closing) = doc_handle.with_doc_mut(|doc| {
let mut bakery: Bakery = hydrate(doc).unwrap();
let customers_with_number: HashMap<String, u32> = bakery
.customers
.clone()
.iter()
.map(|(id, c)| (id.clone(), c.number))
.collect();
let our_info = bakery.customers.get_mut(&customer_id).unwrap();
// Ack changes made by others.
our_info.views_of_others = customers_with_number.clone();

// Ack any new output.
bakery
.output_seen
.insert(customer_id.clone(), bakery.output);

let mut tx = doc.transaction();
reconcile(&mut tx, &bakery).unwrap();
tx.commit();
(customers_with_number, bakery.output, bakery.closing)
});

if closing {
Expand Down Expand Up @@ -451,20 +462,14 @@ async fn main() {
handler_sem: Semaphore::new(1),
});

// Do the below in a task, so that the server immediatly starts running.
let customer_id_clone = customer_id.clone();
let doc_handle_clone = doc_handle.clone();
handle.spawn(async move {
// Start the algorithm "outside the bakery".
// The acks makes this wait for all others to be up and running.
start_outside_the_bakery(&doc_handle_clone, &customer_id_clone).await;

// Continuously request new increments.
request_increment(doc_handle_clone, http_addrs, increment_stop_rx).await;
});

// A task that continuously acknowledges changes made by others.
handle.spawn(async move {
// Continuously acknowledges changes made by others.
acknowlegde_changes(doc_handle, customer_id).await;
});

Expand Down

0 comments on commit 58a7783

Please sign in to comment.