Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swss common bridge #37

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
805 changes: 430 additions & 375 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ members = [
"crates/swbus-core",
"crates/swbusd",
"crates/hamgrd",
"crates/swss-common",
"crates/swbus-edge",
"crates/swbus-proto",
"crates/swss-common",
"crates/swbus-edge",
"crates/swbus-proto",
"crates/swbus-cli",
"crates/swbus-actor",
"crates/swss-common-bridge",
]
exclude = []

Expand All @@ -30,7 +32,7 @@ unused_import_braces = 'warn'
[workspace.dependencies]
# Async framework
tokio = { version = "1.37", features = ["macros", "rt-multi-thread"] }
tokio-util = "0.7"
tokio-util = { version = "0.7", features = ["rt"] }
tokio-stream = "0.1"

# Log and error handling
Expand All @@ -44,7 +46,7 @@ human-panic = "2"
better-panic = "0.3"
signal-hook = "0.3"

# Serializatoin
# Serialization
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
serde_yaml = "0.9"
Expand Down
17 changes: 17 additions & 0 deletions crates/swbus-actor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "swbus-actor"
version.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
documentation.workspace = true
keywords.workspace = true
edition.workspace = true

[dependencies]
swbus-edge = { path = "../swbus-edge" }
tokio.workspace = true
futures = "0.3.31"

[lints]
workspace = true
53 changes: 53 additions & 0 deletions crates/swbus-actor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
pub mod resend_queue;
pub mod runtime;

use std::{future::Future, sync::Arc};
use swbus_edge::{simple_client::*, swbus_proto::swbus::SwbusMessage};
use tokio::sync::mpsc::Sender;

/// Module containing all the imports needed to write an `Actor`.
pub mod prelude {
pub use crate::{resend_queue::ResendQueueConfig, runtime::ActorRuntime, Actor, Outbox};
pub use swbus_edge::{
simple_client::{IncomingMessage, MessageBody, MessageId, OutgoingMessage},
swbus_proto::swbus::{DataRequest, RequestResponse, ServicePath, SwbusErrorCode},
SwbusEdgeRuntime,
};
}

/// An actor that can be run on an [`ActorRuntime`](runtime::ActorRuntime).
pub trait Actor: Send + 'static {
/// The actor just started.
fn init(&mut self, outbox: Outbox) -> impl Future<Output = ()> + Send;

/// A new message came in.
fn handle_message(&mut self, message: IncomingMessage, outbox: Outbox) -> impl Future<Output = ()> + Send;
}

/// An actor's outbox, used to send messages on Swbus.
#[derive(Clone)]
pub struct Outbox {
/// Outgoing message sender. The receiver lives in [`runtime::MessageBridge`].
message_tx: Sender<SwbusMessage>,

/// We need a copy of the SimpleSwbusEdgeClient so that we can get `MessageId`s
/// from sent messages and return them to the actor.
swbus_client: Arc<SimpleSwbusEdgeClient>,
}

impl Outbox {
fn new(message_tx: Sender<SwbusMessage>, swbus_client: Arc<SimpleSwbusEdgeClient>) -> Self {
Self {
message_tx,
swbus_client,
}
}

pub async fn send(&self, msg: OutgoingMessage) -> MessageId {
let (id, msg) = self.swbus_client.outgoing_message_to_swbus_message(msg);
// we ignore this result, because if the MessageBridge was shut down and this returns Err, we
// will be notified anyway by the inbox receiver.
_ = self.message_tx.send(msg).await;
id
}
}
147 changes: 147 additions & 0 deletions crates/swbus-actor/src/resend_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Weak},
time::Duration,
};

use swbus_edge::swbus_proto::swbus::{ServicePath, SwbusMessage};
use tokio::time::Instant;

type MessageId = u64;

/// Settings that determine message resending behavior of an actor.
#[derive(Debug, Copy, Clone)]
pub struct ResendQueueConfig {
/// How long to wait for an ack before sending a message again.
pub resend_time: Duration,

/// How many times to retry a message before giving up.
pub max_tries: u32,
}

/// Outgoing messages and associated state necessary to resend them if they go unacknowledged.
pub(crate) struct ResendQueue {
config: ResendQueueConfig,

/// The messages that actually need to be resent.
unacked_messages: HashMap<MessageId, Arc<SwbusMessage>>,

/// A queue of messages that *may* need to be resent.
/// Messages are ordered earliest at the front to latest at the back.
queue: VecDeque<QueuedMessage>,
}

impl ResendQueue {
pub(crate) fn new(config: ResendQueueConfig) -> Self {
Self {
config,
unacked_messages: HashMap::new(),
queue: VecDeque::new(),
}
}

/// Add a message to the ResendQueue. Assumes the message has already been sent once.
pub(crate) fn enqueue(&mut self, message: SwbusMessage) {
let header = message.header.as_ref().unwrap();
let id = header.id;
let destination = header.destination.as_ref().unwrap().clone();
let strong_message = Arc::new(message);
let weak_message = Arc::downgrade(&strong_message);
self.unacked_messages.insert(id, strong_message);
self.queue.push_back(QueuedMessage {
message: weak_message,
destination,
tries: 1,
resend_at: Instant::now() + self.config.resend_time,
});
}

/// Block until a message *may* need to be resent.
///
/// When this returns, the caller should call `next_resend` or `iter_resend` to get updates.
/// There may be spurrious wakeups, where `next_resend` returns None immediately, as this
/// function may wake up when an acknowledged message would've needed resending.
pub(crate) async fn wait(&self) {
let next_resend_instant = self.queue.front().map(|m| m.resend_at);
match next_resend_instant {
Some(instant) => tokio::time::sleep_until(instant).await, // Sleep until next resend time
None => futures::future::pending::<()>().await, // Future that never finishes and takes no resources
}
}

/// Get an update about the next message that needs to be resent or that went stale.
pub(crate) fn next_resend(&mut self) -> Option<ResendMessage> {
let now = Instant::now();

loop {
match self.queue.front() {
Some(peek) if peek.resend_at <= now => {
let mut queued_msg = self.queue.pop_front().unwrap();

match queued_msg.message.upgrade() {
Some(msg) if queued_msg.tries >= self.config.max_tries => {
// This message has been resent too may times.
// We are going to drop it, and tell the caller.
let id = msg.header.as_ref().unwrap().id;
let destination = queued_msg.destination;
self.unacked_messages.remove(&id);
return Some(ResendMessage::TooManyTries { id, destination });
}
Some(msg) => {
// This message should be retried right now.
// We will requeue it, and tell the caller to resend it.
queued_msg.resend_at += self.config.resend_time;
queued_msg.tries += 1;
self.queue.push_back(queued_msg);
return Some(ResendMessage::Resend(msg));
}
None => {
// The message has already been dropped, because message was acknowledged.
// This, we can ignore this entry and continue the loop.
}
}
}

// Either the queue is empty, or no message needs to be retried yet (because now < top.resend_at)
_ => return None,
}
}
}

/// Iterator that calls `next_resend` until it returns `None`.
pub(crate) fn iter_resend(&mut self) -> impl Iterator<Item = ResendMessage> + '_ {
std::iter::from_fn(|| self.next_resend())
}

/// Signal that a message was acknowledged and no longer needs to be resent. Removes the message
/// with this id from the resend queue.
pub(crate) fn message_acknowledged(&mut self, id: u64) {
self.unacked_messages.remove(&id);
}
}

pub(crate) enum ResendMessage {
/// This message needs to be resent right now
Resend(Arc<SwbusMessage>),

/// The message that was in this slot went stale and was dropped
TooManyTries { id: MessageId, destination: ServicePath },
}

/// A message awaiting an ack from the recipient.
struct QueuedMessage {
/// A copy of the content of the message, so it can be resent.
///
/// If this Weak is broken, that means the message was acked and removed
/// from ResendQueue::unacked_messages, so we should ignore this entry.
message: Weak<SwbusMessage>,

/// A copy of the destination so it can be given to the actor if the message fails
destination: ServicePath,

/// How many times the message has been sent so far
tries: u32,

/// The next time at which to resend the message
resend_at: Instant,
}
137 changes: 137 additions & 0 deletions crates/swbus-actor/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::{
resend_queue::{ResendMessage, ResendQueue, ResendQueueConfig},
Actor, Outbox,
};
use std::sync::Arc;
use swbus_edge::{
simple_client::*,
swbus_proto::swbus::{RequestResponse, ServicePath, SwbusMessage},
SwbusEdgeRuntime,
};
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task::JoinSet,
};

/// Struct that spawns and drives actor tasks.
pub struct ActorRuntime {
swbus_edge: Arc<SwbusEdgeRuntime>,
resend_config: ResendQueueConfig,
tasks: JoinSet<()>,
}

impl ActorRuntime {
/// Create a new Actor runtime on top of an (already started) SwbusEdgeRuntime
pub fn new(swbus_edge: Arc<SwbusEdgeRuntime>, resend_config: ResendQueueConfig) -> Self {
ActorRuntime {
swbus_edge,
resend_config,
tasks: JoinSet::new(),
}
}

/// Spawn an actor that listens for messages on the given service_path
pub async fn spawn<A: Actor>(&mut self, service_path: ServicePath, actor: A) {
let swbus_client = Arc::new(SimpleSwbusEdgeClient::new(self.swbus_edge.clone(), service_path).await);
let (inbox_tx, inbox_rx) = channel(1024);
let (outbox_tx, outbox_rx) = channel(1024);
let message_bridge = MessageBridge::new(self.resend_config, swbus_client.clone(), inbox_tx, outbox_rx);
let outbox = Outbox::new(outbox_tx, swbus_client);
self.tasks.spawn(message_bridge.run());
self.tasks.spawn(actor_main(actor, inbox_rx, outbox));
}

/// Block on all actors
pub async fn join(self) {
self.tasks.join_all().await;
}
}

/// Main loop for an actor task
async fn actor_main(mut actor: impl Actor, mut inbox_rx: Receiver<IncomingMessage>, outbox: Outbox) {
actor.init(outbox.clone()).await;

// If inbox.recv() returns None, the MessageBridge died
while let Some(msg) = inbox_rx.recv().await {
actor.handle_message(msg, outbox.clone()).await;
}
}

/// A bridge between Swbus and an actor, providing middleware (currently just the resend queue).
pub(crate) struct MessageBridge {
resend_queue: ResendQueue,

swbus_client: Arc<SimpleSwbusEdgeClient>,

/// Sender for MessageBridge to send incoming messages or message failure signals to its actor.
/// The receiver exists in actor_main.
inbox_tx: Sender<IncomingMessage>,

/// Receiver for MessageBridge to receive outgoing messages actor.
/// The sender end exists in Outbox.
outbox_rx: Receiver<SwbusMessage>,
}

impl MessageBridge {
fn new(
resend_queue_config: ResendQueueConfig,
swbus_client: Arc<SimpleSwbusEdgeClient>,
inbox_tx: Sender<IncomingMessage>,
outbox_rx: Receiver<SwbusMessage>,
) -> Self {
Self {
resend_queue: ResendQueue::new(resend_queue_config),
swbus_client,
inbox_tx,
outbox_rx,
}
}

/// Message bridge main loop
async fn run(mut self) {
loop {
tokio::select! {
maybe_msg = self.swbus_client.recv() => {
// if maybe_msg is None, swbus has died
let Some(msg) = maybe_msg else { break };
self.handle_incoming_message(msg).await;
}

maybe_msg = self.outbox_rx.recv() => {
// If maybe_msg is None, the actor has died (its Outbox was dropped)
let Some(msg) = maybe_msg else { break };
self.handle_outgoing_message(msg).await;
}

() = self.resend_queue.wait() => {
self.resend_pending_messages().await;
}
}
}
}

async fn handle_incoming_message(&mut self, msg: IncomingMessage) {
if let MessageBody::Response(RequestResponse { request_id, .. }) = &msg.body {
self.resend_queue.message_acknowledged(*request_id);
}
self.inbox_tx.send(msg).await.unwrap();
}

async fn handle_outgoing_message(&mut self, msg: SwbusMessage) {
self.resend_queue.enqueue(msg.clone());
self.swbus_client.send_raw(msg).await.unwrap();
}

async fn resend_pending_messages(&mut self) {
use ResendMessage::*;

for resend in self.resend_queue.iter_resend() {
match resend {
Resend(swbus_msg) => self.swbus_client.send_raw((*swbus_msg).clone()).await.unwrap(),
TooManyTries { id, destination } => {
eprintln!("Message {id} to {destination} was dropped");
}
}
}
}
}
Loading