Skip to content

Commit

Permalink
try to fix issue ANSSI-FR#3
Browse files Browse the repository at this point in the history
  • Loading branch information
github-af committed Jul 10, 2024
1 parent c6aa0b2 commit fc19a7e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 26 deletions.
48 changes: 23 additions & 25 deletions src/receive/decoding.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
//! Worker that decodes RaptorQ packets into protocol messages

use std::{cmp::Ordering, thread::yield_now};

use crate::{protocol, receive};

pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::Error> {
pub(crate) fn start<F>(
receiver: &receive::Receiver<F>,
nb_decoding_threads: u8,
) -> Result<(), receive::Error> {
let encoding_block_size = receiver.object_transmission_info.transfer_length();

loop {
Expand All @@ -23,37 +24,34 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E

match decoder.decode(packets) {
None => {
log::warn!("lost block {block_id}");
log::error!("lost block {block_id}, synchronization lost");
continue;
}
Some(block) => {
log::trace!("block {} decoded with {} bytes!", block_id, block.len());
log::trace!("block {block_id} decoded with {} bytes!", block.len());

let mut retry_cnt = 0;
let mut retried = 0;

loop {
let mut to_receive = receiver.block_to_receive.lock().expect("acquire lock");
match block_id.cmp(&to_receive) {
Ordering::Equal => {
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
}
Ordering::Greater => {
// Thread is too late, drop the packet and kill the current job
log::warn!("Dropping the packet {block_id}");

if *to_receive == block_id {
// The decoded block is the expected one, dispatching it
receiver
.to_dispatch
.send(protocol::Message::deserialize(block))?;
*to_receive = to_receive.wrapping_add(1);
break;
} else {
// The decoded block is not the expected one
// Retrying until all decoding threads had one chance to dispatch their block
if nb_decoding_threads < retried {
// All decoding threads should have had one chance to dispatch their block
log::warn!("dropping block {block_id} after trying to dispatch it {retried} times");

break;
}
Ordering::Less => {
if retry_cnt < 10 {
retry_cnt += 1;
yield_now();
} else {
break;
}
}
retried += 1;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ where
for i in 0..self.config.nb_decoding_threads {
thread::Builder::new()
.name(format!("decoding_{i}"))
.spawn_scoped(scope, || decoding::start(self))?;
.spawn_scoped(scope, || {
decoding::start(self, self.config.nb_decoding_threads)
})?;
}

thread::Builder::new()
Expand Down

0 comments on commit fc19a7e

Please sign in to comment.