Skip to content

Commit

Permalink
addressing early comments, fixing deadlocks, fixing drain logic
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Rudie <[email protected]>
  • Loading branch information
ilrudie committed Jan 18, 2024
1 parent 6aea14a commit e51ff93
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 25 deletions.
52 changes: 28 additions & 24 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ impl ConnectionDrain {
let (tx, rx) = drain::channel();
ConnectionDrain { tx, rx }
}

/// drain drops the internal reference to rx and then signals drain on the tx
// always inline, this is for convenience so that we don't forget do drop the rx but there's really no reason it needs to grow the stack
#[inline(always)]
async fn drain(self) {
drop(self.rx); // very important, drain cannont complete if there are outstand rx
self.tx.drain().await;
}
}

#[derive(Clone)]
Expand All @@ -44,32 +52,28 @@ impl ConnectionManager {
}

pub async fn track(self, c: &Connection) -> drain::Watch {
match self.drains.read().await.get(c) {
Some(cd) => cd.rx.clone(),
None => {
let cd = ConnectionDrain::new();
let rx = cd.rx.clone();
//TODO: this was racy, another insert may happen between dropping the read lock and attaining this write lock
// try_insert is the best solution once it's no longer a nightly-only experimental API
let mut drains = self.drains.write().await;
if let Some(w) = drains.get(c) {
return w.rx.clone();
}
drains.insert(c.clone(), cd);
rx
}
// consider removing this whole if let since if it's None we need to perform another get inside the write lock to prevnt racy inserts
if let Some(cd) = self.drains.read().await.get(c) {
return cd.rx.clone();
}
let cd = ConnectionDrain::new();
let rx = cd.rx.clone();
//TODO: this was racy, another insert may happen between dropping the read lock and attaining this write lock
// try_insert is the best solution once it's no longer a nightly-only experimental API
let mut drains = self.drains.write().await;
if let Some(w) = drains.get(c) {
return w.rx.clone();
}
drains.insert(c.clone(), cd);
rx
}

pub async fn drain(&self, c: &Connection) {
match self.drains.clone().write().await.remove(c) {
Some(cd) => {
cd.tx.drain().await;
}
None => {
// this is bad, possibly drain called twice
error!("requested drain on a Connection which wasn't initialized");
}
if let Some(cd) = self.drains.clone().write().await.remove(c) {
cd.drain().await;
} else {
// this is bad, possibly drain called twice
error!("requested drain on a Connection which wasn't initialized");
}
}

Expand All @@ -80,8 +84,8 @@ impl ConnectionManager {

pub async fn drain_all(self) {
let mut drains = self.drains.write_owned().await;
for (_c, cd) in drains.drain() {
cd.tx.drain().await;
for (_conn, cd) in drains.drain() {
cd.drain().await;
}
}
}
4 changes: 3 additions & 1 deletion src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,20 @@ impl Inbound {
loop {
tokio::select! {
_ = stop_rx.changed() => {
return
break;
}
_ = policies_changed.changed() => {
let connections = connection_manager.connections().await;
for conn in connections {
if !state.assert_rbac(&conn).await {
connection_manager.drain(&conn).await;
info!("connection {} drained because it's no longer allowed after a policy update", conn);
}
}
}
}
}
info!("live connection policy check loop exited");
});

while let Some(socket) = stream.next().await {
Expand Down

0 comments on commit e51ff93

Please sign in to comment.