Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rowan/feature/make nix work with dispatcher #47

Merged
merged 31 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6687044
Cmd_dispatcher initial checkin
rcunrau Sep 4, 2024
d005c45
Rebase
rcunrau Sep 4, 2024
46c83db
tentative changes to ex3_shared_libs
rcunrau Sep 4, 2024
91e9a11
Fix warnings and errors
rcunrau Sep 6, 2024
1c2779c
Fixes to jump table and nix::write
rcunrau Sep 12, 2024
3b8438a
Work in progress. Trying to make tall thin work with new dispatcher
rrasmuss4200 Sep 5, 2024
2fc0560
Work in progress. Plan to get rid of buffer field in IpcServer to mak…
rrasmuss4200 Sep 9, 2024
2fef474
commit before rebase
rrasmuss4200 Sep 12, 2024
2c2ccb0
cmd_disp, cli_gs and coms_hand all compile and work with new nix. yaya
rrasmuss4200 Sep 12, 2024
abc79b1
Added debugging statements and changed order of coms bootup
rrasmuss4200 Sep 13, 2024
a77e072
TMP test changes to try and get whole pipeline setup
rrasmuss4200 Sep 15, 2024
3c38c9f
For some reason, COMS pipeline only accepts in coms_hand after TCP co…
rrasmuss4200 Sep 15, 2024
f904def
Fixed index err in cmd_disp.
rrasmuss4200 Sep 15, 2024
49ce9b3
rebase cleanup
rrasmuss4200 Sep 24, 2024
af83657
Able to pass data from GS to cmd_disp
rrasmuss4200 Sep 26, 2024
3b851e6
Fixed buffer it's reading from
rrasmuss4200 Sep 26, 2024
dd660a9
Cmd disp can receive and read dest_id. Now need to incorporate handlers
rrasmuss4200 Sep 26, 2024
f3379c0
Took accept out of server initialization in coms_handler
rrasmuss4200 Sep 26, 2024
8067b7c
restructure bulk disp to be purely a server
rrasmuss4200 Sep 26, 2024
299c365
CAN SEND STUFFFFFF
rrasmuss4200 Sep 26, 2024
9bf8e53
cleanup
rrasmuss4200 Sep 26, 2024
798d424
Can only uplink w/ BulkMsgDisp
rrasmuss4200 Sep 28, 2024
7686b23
UPLINK TO DFGM :))))
rrasmuss4200 Sep 28, 2024
2e6dde7
TMP changed script to work. will include iris when working
rrasmuss4200 Oct 1, 2024
61533dc
Downlink works but continues indefinitly.
rrasmuss4200 Oct 2, 2024
b4bd587
Attempt at fixing downlink. Swapping to making all compile
rrasmuss4200 Oct 9, 2024
c0ca7dd
make dev tools compile
rrasmuss4200 Oct 11, 2024
d24fc00
Iris compiles
rrasmuss4200 Oct 15, 2024
1e646e6
shell compile
rrasmuss4200 Oct 15, 2024
22769c9
rm warnings
rrasmuss4200 Oct 15, 2024
0076adc
fix component casting test
rrasmuss4200 Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"ex3_ground_station/dashboard",
"ex3_ground_station/server",
"ex3_obc_fsw/bulk_msg_dispatcher",
"ex3_obc_fsw/cmd_dispatcher",
"ex3_obc_fsw/dev_tools/ipc_burst_hardcoded",
"ex3_obc_fsw/dev_tools/ipc_dummy_client",
"ex3_obc_fsw/dev_tools/ipc_server_dev_tool",
Expand Down
3 changes: 1 addition & 2 deletions ex3_ground_station/cli_ground_station/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use bulk_msg_slicing::*;
use common::*;
use libc::c_int;
use message_structure::*;
use ports::SIM_UHF_GS_PORT;
use std::fs::File;
use std::path::Path;
use tcp_interface::*;
Expand Down Expand Up @@ -201,7 +200,7 @@ async fn main() {

eprintln!("Connecting to Coms handler via TCP at {ipaddr}...");

let mut tcp_interface = match TcpInterface::new_client(ipaddr.to_string(), SIM_UHF_GS_PORT) {
let mut tcp_interface = match TcpInterface::new_client(ipaddr.to_string(), ports::SIM_COMMS_PORT) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why "SIM"?

Ok(ti) => ti,
Err(e) => {
eprintln!("Can't connect to satellite: {e}");
Expand Down
150 changes: 80 additions & 70 deletions ex3_obc_fsw/bulk_msg_dispatcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::fs::{File, OpenOptions};
use std::io::Error as IoError;
use std::io::Read;
use std::os::fd::OwnedFd;
use std::thread;
use std::time::Duration;
use std::path::Path;
Expand All @@ -16,75 +17,98 @@
const INTERNAL_MSG_BODY_SIZE: usize = 4089; // 4KB - 7 (header) being passed internally
fn main() -> Result<(), IoError> {
// All connected handlers and other clients will have a socket for the server defined here
let mut coms_interface: IpcServer = IpcServer::new("gs_bulk".to_string())?;
let mut cmd_msg_disp_interface: IpcClient = IpcClient::new("bulk_disp".to_string())?;
// This pipeline is directly to the coms_handler to be directly downlinked sliced data packets
let coms_interface_res = IpcServer::new("gs_bulk".to_string());
let mut coms_interface = match coms_interface_res {
Ok(s) => Some(s),
Err(e) => {
warn!("Connot create bulk to ground pipeline: {e}");
None
}
};

// This interface is for recieving commands from the cmd_dispatcher telling this bulk_msg_dispatcher what to slice and pass on
let cmd_disp_interface_res = IpcServer::new("BulkMsgDispatcher".to_string());
let mut cmd_disp_interface = match cmd_disp_interface_res {
Ok(s) => Some(s),
Err(e) => {
warn!("Connot create cmd_disp to bulk_disp pipeline: {e}");
None
}
};

let mut messages = Vec::new();
let mut num_of_4kb_msgs = 1;
let mut num_bytes = 4098;

let log_path = "logs";
init_logger(log_path);

loop {
let coms_interface_clone = coms_interface.clone();
let mut servers: Vec<&mut IpcServer> = vec![&mut coms_interface];
let mut clients: Vec<&mut IpcClient> = vec![&mut cmd_msg_disp_interface];

poll_ipc_clients(&mut clients)?;
// Msgs from the cmd_msg_dispatcher. I.e: Commands to downlink data from a certain path.
for client in clients {
if let Some(msg) = handle_server_input(client)? {
let path_bytes: Vec<u8> = msg.msg_body.clone();
let path = get_path_from_bytes(path_bytes)?;
match get_data_from_path(&path) {
Ok(bulk_msg) => {
trace!("Bytes expected at GS: {}", bulk_msg.msg_body.len() + 7); // +7 for header
// Slice bulk msg
// TODO - Cloning here might affect performance!!
messages = handle_large_msg(bulk_msg.clone(), INTERNAL_MSG_BODY_SIZE)?;

// Calculate num of 4KB msgs
let first_msg = messages[0].clone();
let num_of_4kb_msgs = u16::from_le_bytes([first_msg.msg_body[0],first_msg.msg_body[1]]) + 1; // account for msg containing num of msgs
trace!("Num of 4k msgs: {}", num_of_4kb_msgs);

// Start coms protocol with coms handler to downlink
send_num_msgs_and_bytes_to_gs(
num_of_4kb_msgs,
bulk_msg.msg_body.len() as u64,
coms_interface_clone.data_fd,
)?;

client.clear_buffer();
}
Err(e) => {
warn!("Error reading data from path: {}",e);
}
}
}
}

let mut servers = vec![&mut coms_interface, &mut cmd_disp_interface];

poll_ipc_server_sockets(&mut servers);
// msgs from coms_handler
for server in servers {
if let Some(msg) = handle_client(server)? {
if msg.header.msg_type == MsgType::Ack as u8 {
// Is there a better way of differentiating between ACK's?
if msg.msg_body[0] == 0 {
for (i, message) in messages.iter().enumerate() {
let serialized_msgs = serialize_msg(message)?;
trace!("Sending {} B", serialized_msgs.len());
ipc_write(coms_interface_clone.data_fd, &serialized_msgs)?;
trace!("Sent msg #{}", i + 1);
// save_data_to_file(message.msg_body.clone(), 0);
thread::sleep(Duration::from_millis(100));

for server_opt in servers {
if let Some(server) = server_opt {
if let Some(msg) = handle_client(server)? {
if server.socket_path.contains("gs_bulk") {
if msg.header.msg_type == MsgType::Ack as u8 {
// Handle ACK message
if msg.msg_body[0] == 0 {
for (i, message) in messages.iter().enumerate() {
let serialized_msgs = serialize_msg(message)?;
trace!("Sending {} B", serialized_msgs.len());
if let Some(data_fd) = &server.data_fd {
ipc_write(data_fd, &serialized_msgs)?;
} else {
warn!("No data file descriptor found in coms_interface.");
break;
}
trace!("Sent msg #{}", i + 1);
thread::sleep(Duration::from_millis(100));
}
} else {
todo!();
}
}
} else if server.socket_path.contains("BulkMsgDispatcher") {
let path_bytes: Vec<u8> = msg.msg_body.clone();
let path = get_path_from_bytes(path_bytes)?;
match get_data_from_path(&path) {
Ok(bulk_msg) => {
trace!("Bytes expected at GS: {}", bulk_msg.msg_body.len() + 7); // +7 for header
messages = handle_large_msg(bulk_msg.clone(), INTERNAL_MSG_BODY_SIZE)?;

let first_msg = messages[0].clone();
num_of_4kb_msgs = u16::from_le_bytes([first_msg.msg_body[0], first_msg.msg_body[1]]) + 1;
num_bytes = bulk_msg.msg_body.len() as u64;
trace!("Num of 4k msgs: {}", num_of_4kb_msgs);

server.clear_buffer();
}
Err(e) => {
warn!("Error reading data from path: {}", e);
}
}
}
}
if messages.len() > 1 {
// doesn't work right now! Donwlink broken :(
if let Some(data_fd) = &server.data_fd {
// Want to write to gs_bulk, not BulkMsgDispatcher fd
send_num_msgs_and_bytes_to_gs(num_of_4kb_msgs, num_bytes, data_fd)?;
} else {
todo!()
warn!("No data file descriptor found in coms_interface.");
}
messages = Vec::new();
server.clear_buffer();
}
server.clear_buffer();
}

}

Check warning on line 109 in ex3_obc_fsw/bulk_msg_dispatcher/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary `if let` since only the `Some` variant of the iterator element is used

warning: unnecessary `if let` since only the `Some` variant of the iterator element is used --> ex3_obc_fsw/bulk_msg_dispatcher/src/main.rs:52:9 | 52 | for server_opt in servers { | ^ ------- help: try: `servers.into_iter().flatten()` | _________| | | 53 | | if let Some(server) = server_opt { 54 | | if let Some(msg) = handle_client(server)? { 55 | | if server.socket_path.contains("gs_bulk") { ... | 108 | | 109 | | } | |_________^ | help: ...and remove the `if let` statement in the for loop --> ex3_obc_fsw/bulk_msg_dispatcher/src/main.rs:53:13 | 53 | / if let Some(server) = server_opt { 54 | | if let Some(msg) = handle_client(server)? { 55 | | if server.socket_path.contains("gs_bulk") { 56 | | if msg.header.msg_type == MsgType::Ack as u8 { ... | 106 | | } 107 | | } | |_____________^ = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#manual_flatten = note: `#[warn(clippy::manual_flatten)]` on by default
}

}

fn get_path_from_bytes(path_bytes: Vec<u8>) -> Result<String, IoError> {
Expand All @@ -108,23 +132,9 @@
}
}

/// Same as handle client but for getting a msg from the cmd_msg_disp
fn handle_server_input(client: &IpcClient) -> Result<Option<Msg>, IoError> {
if client.buffer != [0u8; IPC_BUFFER_SIZE] {
trace!(
"Server {} received data",
client.socket_path
);
//Build Msg from received bytes and get body which contains path
Ok(Some(deserialize_msg(&client.buffer)?))
} else {
Ok(None)
}
}

/// This is the communication protocol that will execute each time the Bulk Msg Dispatcher wants
/// to send a Bulk Msg to the coms handler for downlinking.
fn send_num_msgs_and_bytes_to_gs(num_msgs: u16, num_bytes: u64, fd: Option<i32>) -> Result<(), IoError> {
fn send_num_msgs_and_bytes_to_gs(num_msgs: u16, num_bytes: u64, fd: &OwnedFd) -> Result<(), IoError> {
// 1. Send Msg to coms handler indicating Bulk Msg and buffer size needed
let mut num_msgs_bytes: Vec<u8> = num_msgs.to_le_bytes().to_vec();
let mut num_bytes_bytes: Vec<u8> = num_bytes.to_le_bytes().to_vec();
Expand Down
14 changes: 14 additions & 0 deletions ex3_obc_fsw/cmd_dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "cmd_dispatcher"
version = "0.1.0"
edition = "2021"

[dependencies]
strum = { version = "0.26", features = ["derive"] }
strum_macros = "0.26"
nix = { version = "0.29.0", features = ["socket"] }
ipc = { path = "../../ex3_shared_libs/interfaces/ipc" }
common = {path = "../../ex3_shared_libs/common"}
message_structure = { path = "../../ex3_shared_libs/message_structure" }
logging = { path = "../../ex3_shared_libs/logging" }
log = "0.4.22"
96 changes: 96 additions & 0 deletions ex3_obc_fsw/cmd_dispatcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use nix::unistd::{write, close};
use nix::Error;
use strum::IntoEnumIterator;
use std::os::fd::{AsFd, AsRawFd};

use ipc::{poll_ipc_clients, IpcClient, IPC_BUFFER_SIZE};
use common::component_ids::ComponentIds;
use message_structure::MsgHeader;

fn main() {
let component_streams: Vec<Option<IpcClient>> =
ComponentIds::iter().enumerate().map(|(i,c)| {
println!("{i}");
match IpcClient::new(format!("{c}")) {
Ok(client) => {
Some(client)
}
Err(e) => {
eprintln!("msg dispatcher couldn't connect to {}: {}", c, e);
None
},
}
}).collect();

for x in 0..ComponentIds::LAST as usize {
let payload = match ComponentIds::try_from(x as u8) {
Ok(p) => {
eprintln!("x {} yields {}", x, p);
p
},
Err(()) => {
eprintln!("x {} didn't convert", x);
continue;
}
};
match component_streams.get(x) {
Some(element) => match element {
Some(_e) => {
println!("{} connected", payload);
}
None => {
println!("{} not connected!", payload);
}
},
None => println!("bad index {}", x),
};
}

let mut cmd_client = match IpcClient::new("cmd_dispatcher".to_string()) {
Ok(s) => Some(s),
Err(e) => {
eprintln!("Server connection error: {}", e);
return; // Should fix it and retry
}
};

loop {
let mut clients = vec![&mut cmd_client];
let (_s,_bytes) = match poll_ipc_clients(&mut clients) {
Ok((bytes, sock)) => (bytes,sock),
Err(e) => {
eprintln!("read error: {}", e);
let _ = close(cmd_client.as_ref().unwrap().fd.as_raw_fd());
continue; // try again
}
};
if cmd_client.as_ref().unwrap().buffer != [0u8; IPC_BUFFER_SIZE] {
println!("Got cmd: {:?}", cmd_client.as_ref().unwrap().buffer);
let dest = cmd_client.as_ref().unwrap().buffer[MsgHeader::DEST_INDEX];
let res = match ComponentIds::try_from(dest) {
Ok(payload) => {
match &component_streams[dest as usize] {
Some(client) => {
println!("Writing buffer {:?}", cmd_client.as_ref().unwrap().buffer);
write(client.fd.as_fd(), &cmd_client.as_ref().unwrap().buffer)
},
None => {
eprintln!("No payload: {payload}!");
Err(Error::EPIPE)
}
}
},
Err(_) => {
eprintln!("Invalid payload: {dest}");
Err(Error::EINVAL)
}
};

if res.is_err() {
eprintln!("Dispatch failed: NACKing");
// Should actually NACK
}
cmd_client.as_mut().unwrap().clear_buffer();
}
}
}
2 changes: 1 addition & 1 deletion ex3_obc_fsw/dev_tools/ipc_burst_hardcoded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn main() {
println!("Attempting to send: {:?}", msg_bytes);

// Send the msg
ipc_write(ipc_interface.fd, &msg_bytes).unwrap();
ipc_write(&ipc_interface.fd, &msg_bytes).unwrap();

println!("Sent successful");

Expand Down
17 changes: 9 additions & 8 deletions ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
//write first hardcoded msg to ipc client
let msg = CmdMsg::new(1, ComponentIds::SHELL as u8, 3, 0, vec![5, 6, 7, 8, 9, 10]);
let serialized_msg = CmdMsg::serialize_to_bytes(&msg).unwrap();
ipc_write(ipc_server.data_fd, serialized_msg.as_slice())
ipc_write(&ipc_server.data_fd.as_ref().unwrap(), serialized_msg.as_slice())

Check warning on line 30 in ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs:30:23 | 30 | ipc_write(&ipc_server.data_fd.as_ref().unwrap(), serialized_msg.as_slice()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `ipc_server.data_fd.as_ref().unwrap()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
}
'2' => {
println!("Sending msg 2");
//write first hardcoded msg to ipc client
let msg = CmdMsg::new(2, ComponentIds::SHELL as u8, 3, 1, vec![5, 6, 7, 8, 9, 10]);
let serialized_msg = CmdMsg::serialize_to_bytes(&msg).unwrap();
ipc_write(ipc_server.data_fd, serialized_msg.as_slice())
ipc_write(&ipc_server.data_fd.as_ref().unwrap(), serialized_msg.as_slice())

Check warning on line 37 in ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs:37:23 | 37 | ipc_write(&ipc_server.data_fd.as_ref().unwrap(), serialized_msg.as_slice()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `ipc_server.data_fd.as_ref().unwrap()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
}
_ => {
println!("Invalid input");
Expand Down Expand Up @@ -70,13 +70,14 @@
std::process::exit(1);
}

let mut ipc_server = IpcServer::new(args[1].clone()).unwrap();
let mut ipc_server = Some(IpcServer::new(args[1].clone()).unwrap());

loop {
poll_ipc_server_sockets(&mut [&mut ipc_server]);
if ipc_server.buffer != [0u8; IPC_BUFFER_SIZE] {
println!("Received message from ipc client {:?}", ipc_server.buffer);
ipc_server.clear_buffer();
let mut servers = vec![&mut ipc_server];
poll_ipc_server_sockets(&mut servers);
if ipc_server.as_ref().unwrap().buffer != [0u8; IPC_BUFFER_SIZE] {
println!("Received message from ipc client {:?}", ipc_server.as_ref().unwrap().buffer);
ipc_server.as_mut().unwrap().clear_buffer();
}

// Poll stdin for user input
Expand All @@ -85,7 +86,7 @@
if let Some(bytes_read) = stdin_read_res {
if bytes_read > 0 {
println!("Received user input: {:?}", stdin_buf);
handle_user_input(stdin_buf.as_slice(), &mut ipc_server);
handle_user_input(stdin_buf.as_slice(), &mut ipc_server.as_mut().unwrap());

Check warning on line 89 in ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> ex3_obc_fsw/dev_tools/ipc_server_dev_tool/src/main.rs:89:57 | 89 | handle_user_input(stdin_buf.as_slice(), &mut ipc_server.as_mut().unwrap()); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: change this to: `ipc_server.as_mut().unwrap()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions ex3_obc_fsw/handlers/adcs_handler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl ADCSHandler {
pub fn run(&mut self) -> std::io::Result<()> {
loop {
if let Ok((n, _)) =
poll_ipc_clients(&mut vec![self.dispatcher_interface.as_mut().unwrap()])
poll_ipc_clients(&mut vec![&mut self.dispatcher_interface])
{
if n > 0 {
let mut socket_buf = self.dispatcher_interface.as_mut().unwrap().read_buffer();
Expand Down Expand Up @@ -300,7 +300,7 @@ fn main() -> Result<(), Error> {
let adcs_interface = TcpInterface::new_client("127.0.0.1".to_string(), ports::SIM_ADCS_PORT);

//Create IPC interface for ADCS handler to talk to message dispatcher
let dispatcher_interface = IpcClient::new("adcs_handler".to_string());
let dispatcher_interface = IpcClient::new("ADCS".to_string());

//Create ADCS handler
let mut adcs_handler = ADCSHandler::new(adcs_interface, dispatcher_interface);
Expand Down
Loading
Loading