Skip to content

Commit

Permalink
cmd_disp, cli_gs and coms_hand all compile and work with new nix. yaya
Browse files Browse the repository at this point in the history
  • Loading branch information
rrasmuss4200 committed Sep 12, 2024
1 parent 0fef3f6 commit 25dc2a1
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 62 deletions.
112 changes: 57 additions & 55 deletions ex3_obc_fsw/handlers/coms_handler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ fn main() {
TcpInterface::new_server("127.0.0.1".to_string(), ports::SIM_COMMS_PORT).unwrap();

//Setup interface for comm with OBC FSW components (IPC), for the purpose of passing messages to and from the GS
let mut ipc_gs_interface;
let ipc_gs_interface_res = IpcClient::new("gs_bulk".to_string());
if let Err(e) = ipc_gs_interface_res {
warn!("Error creating IPC interface: {e}");
} else {
ipc_gs_interface = ipc_gs_interface_res.unwrap();

}
let ipc_gs_interfac_res = IpcClient::new("gs_bulk".to_string());
let mut ipc_gs_interface = match ipc_gs_interfac_res {
Ok(i) => Some(i),
Err(e) => {
warn!("Cannot connect to bulk interface: {e}");
None
}
};

//Setup interface for comm with OBC FSW components (IPC), for passing messages to and from the UHF specifically
// TODO - name this to gs_handler once uhf handler and gs handler are broken up from this program.
Expand Down Expand Up @@ -164,63 +164,65 @@ fn main() {
let coms_bytes_read_res = poll_ipc_server_sockets(&mut servers);
let ipc_bytes_read_res = poll_ipc_clients(&mut clients);

if ipc_gs_interface.buffer != [0u8; IPC_BUFFER_SIZE] {
trace!("Received IPC Msg bytes for GS");
let deserialized_msg_result = deserialize_msg(&ipc_gs_interface.buffer);
match deserialized_msg_result {
Ok(deserialized_msg) => {
// writes directly to GS, handling case if it's a bulk message
if deserialized_msg.header.msg_type == MsgType::Bulk as u8 && !received_bulk_ack
{
// If we haven't received Bulk ACK, we need to send ack
if let Some(e) = send_bulk_ack(&ipc_gs_interface.fd).err() {
println!("failed to send bulk ack: {e}");
}
received_bulk_ack = true;
let expected_msgs_bytes = [
deserialized_msg.msg_body[0],
deserialized_msg.msg_body[1],
];
expected_msgs = u16::from_le_bytes(expected_msgs_bytes);
trace!("Expecting {} 4KB msgs", expected_msgs);
// Send msg containing num of 4KB msgs and num of bytes to expect
send_initial_bulk_to_gs(deserialized_msg, &mut tcp_interface);
} else if deserialized_msg.header.msg_type == MsgType::Bulk as u8
&& received_bulk_ack
{
// await_ack_for_bulk(&mut tcp_interface);
// Here where we read incoming bulk msgs from bulk_msg_disp
if bulk_msgs_read < expected_msgs {
if let Ok((ipc_bytes_read, ipc_name)) = ipc_bytes_read_res {
if ipc_name.contains("gs") {
let cur_buf = ipc_gs_interface.buffer[..ipc_bytes_read].to_vec();
println!("Bytes read: {}", cur_buf.len());
let cur_msg = deserialize_msg(&cur_buf).unwrap();
write_msg_to_uhf_for_downlink(&mut tcp_interface, cur_msg);
bulk_msgs_read += 1;
if let Some(ref mut ipc_gs_interfac) = ipc_gs_interface {
if ipc_gs_interfac.buffer != [0u8; IPC_BUFFER_SIZE] {
trace!("Received IPC Msg bytes for GS");
let deserialized_msg_result = deserialize_msg(&ipc_gs_interfac.buffer);
match deserialized_msg_result {
Ok(deserialized_msg) => {
// writes directly to GS, handling case if it's a bulk message
if deserialized_msg.header.msg_type == MsgType::Bulk as u8 && !received_bulk_ack
{
// If we haven't received Bulk ACK, we need to send ack
if let Some(e) = send_bulk_ack(&ipc_gs_interfac.fd).err() {
println!("failed to send bulk ack: {e}");
}
received_bulk_ack = true;
let expected_msgs_bytes = [
deserialized_msg.msg_body[0],
deserialized_msg.msg_body[1],
];
expected_msgs = u16::from_le_bytes(expected_msgs_bytes);
trace!("Expecting {} 4KB msgs", expected_msgs);
// Send msg containing num of 4KB msgs and num of bytes to expect
send_initial_bulk_to_gs(deserialized_msg, &mut tcp_interface);
} else if deserialized_msg.header.msg_type == MsgType::Bulk as u8
&& received_bulk_ack
{
// await_ack_for_bulk(&mut tcp_interface);
// Here where we read incoming bulk msgs from bulk_msg_disp
if bulk_msgs_read < expected_msgs {
if let Ok((ipc_bytes_read, ipc_name)) = ipc_bytes_read_res {
if ipc_name.contains("gs") {
let cur_buf = ipc_gs_interfac.buffer[..ipc_bytes_read].to_vec();
println!("Bytes read: {}", cur_buf.len());
let cur_msg = deserialize_msg(&cur_buf).unwrap();
write_msg_to_uhf_for_downlink(&mut tcp_interface, cur_msg);
bulk_msgs_read += 1;
}
} else {
warn!("Error reading bytes from poll.");
}
} else {
warn!("Error reading bytes from poll.");
}
} else {
let _ = write_msg_to_uhf_for_downlink(&mut tcp_interface, deserialized_msg);
}
} else {
let _ = write_msg_to_uhf_for_downlink(&mut tcp_interface, deserialized_msg);
}
}
Err(e) => {
warn!("Error deserializing GS IPC msg: {:?}", e);
//Handle deserialization of IPC msg failure
}
};
trace!("Bulk msgs read: {}", bulk_msgs_read);
ipc_gs_interface.clear_buffer();
Err(e) => {
warn!("Error deserializing GS IPC msg: {:?}", e);
//Handle deserialization of IPC msg failure
}
};
trace!("Bulk msgs read: {}", bulk_msgs_read);
ipc_gs_interfac.clear_buffer();
}
}
// If we are done reading bulk msgs, start protocol with GS
if received_bulk_ack && bulk_msgs_read >= expected_msgs {
bulk_msgs_read = 0;
expected_msgs = 0;
received_bulk_ack = false;
ipc_gs_interface.clear_buffer();
ipc_gs_interface.as_mut().unwrap().clear_buffer();
}

// Poll the IPC unix domain socket for the COMS channel
Expand Down
17 changes: 10 additions & 7 deletions ex3_shared_libs/interfaces/ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,17 @@ impl IpcClient {
tmp
}
}

pub fn poll_ipc_clients(clients: &mut Vec<&mut IpcClient>) -> Result<(usize, String), std::io::Error> {
///
pub fn poll_ipc_clients(clients: &mut Vec<&mut Option<IpcClient>>) -> Result<(usize, String), std::io::Error> {
//Create poll fd instances for each client
let mut poll_fds: Vec<libc::pollfd> = Vec::new();
for client in &mut *clients {
// Poll data_fd for incoming data
if let None = client {
return Ok((0,"".to_string()));
}
poll_fds.push(libc::pollfd {
fd: client.fd.as_raw_fd(),
fd: client.as_ref().unwrap().fd.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
});
Expand All @@ -118,16 +121,16 @@ pub fn poll_ipc_clients(clients: &mut Vec<&mut IpcClient>) -> Result<(usize, Str
//Poll each client for incoming data
for poll_fd in poll_fds.iter() {
if poll_fd.revents & libc::POLLIN != 0 {
let client = clients.iter_mut().find(|s| s.fd.as_raw_fd() == poll_fd.fd);
let client = clients.iter_mut().find(|s| s.as_ref().unwrap().fd.as_raw_fd() == poll_fd.fd);
if let Some(client) = client {
// Handle incoming data from a connected client
let bytes_read = read(client.fd.as_raw_fd(), &mut client.buffer)?;
let bytes_read = read(client.as_ref().unwrap().fd.as_raw_fd(), &mut client.as_mut().unwrap().buffer)?;
if bytes_read > 0 {
println!(
"Received {} bytes on socket {}",
bytes_read, client.socket_path
bytes_read, client.as_ref().unwrap().socket_path
);
return Ok((bytes_read, client.socket_path.clone()));
return Ok((bytes_read, client.as_ref().unwrap().socket_path.clone()));
}
}
}
Expand Down

0 comments on commit 25dc2a1

Please sign in to comment.