From 27f2a6789f9d6b260a6c1d60a537fa73834c842f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 13 Mar 2024 00:44:42 +0100 Subject: [PATCH 1/5] Factor out shared reflector store internals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/reflector/store.rs | 41 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 8fc51b638..14b53063c 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -9,7 +9,6 @@ use parking_lot::RwLock; use std::{fmt::Debug, hash::Hash, sync::Arc}; use thiserror::Error; -type Cache = Arc, Arc>>>; /// A writable Store handle /// @@ -20,10 +19,9 @@ pub struct Writer where K::DynamicType: Eq + Hash, { - store: Cache, + store: Arc>, dyntype: K::DynamicType, ready_tx: Option>, - ready_rx: Arc>, } impl Writer @@ -37,10 +35,12 @@ where pub fn new(dyntype: K::DynamicType) -> Self { let (ready_tx, ready_rx) = DelayedInit::new(); Writer { - store: Default::default(), + store: Arc::new(Inner { + cache: Default::default(), + ready_rx, + }), dyntype, ready_tx: Some(ready_tx), - ready_rx: Arc::new(ready_rx), } } @@ -52,7 +52,6 @@ where pub fn as_reader(&self) -> Store { Store { store: self.store.clone(), - ready_rx: self.ready_rx.clone(), } } @@ -62,18 +61,18 @@ where watcher::Event::Applied(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); let obj = Arc::new(obj.clone()); - self.store.write().insert(key, obj); + self.store.cache.write().insert(key, obj); } watcher::Event::Deleted(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); - self.store.write().remove(&key); + self.store.cache.write().remove(&key); } watcher::Event::Restarted(new_objs) => { let new_objs = new_objs .iter() .map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone()))) .collect::>(); - *self.store.write() = new_objs; + *self.store.cache.write() = new_objs; } } @@ -105,8 +104,7 @@ pub struct Store where K::DynamicType: Hash + Eq, { - store: Cache, - ready_rx: Arc>, + store: Arc>, } #[derive(Debug, Error)] @@ -125,7 +123,7 @@ where /// # Errors /// Returns an error if the [`Writer`] was dropped before any value was written. pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> { - self.ready_rx.get().await.map_err(WriterDropped) + self.store.ready_rx.get().await.map_err(WriterDropped) } /// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache. @@ -139,7 +137,7 @@ where /// reasonable `error_policy`. #[must_use] pub fn get(&self, key: &ObjectRef) -> Option> { - let store = self.store.read(); + let store = self.store.cache.read(); store .get(key) // Try to erase the namespace and try again, in case the object is cluster-scoped @@ -157,7 +155,7 @@ where /// Return a full snapshot of the current values #[must_use] pub fn state(&self) -> Vec> { - let s = self.store.read(); + let s = self.store.cache.read(); s.values().cloned().collect() } @@ -168,6 +166,7 @@ where P: Fn(&K) -> bool, { self.store + .cache .read() .iter() .map(|(_, k)| k) @@ -178,13 +177,13 @@ where /// Return the number of elements in the store #[must_use] pub fn len(&self) -> usize { - self.store.read().len() + self.store.cache.read().len() } /// Return whether the store is empty #[must_use] pub fn is_empty(&self) -> bool { - self.store.read().is_empty() + self.store.cache.read().is_empty() } } @@ -203,6 +202,16 @@ where (r, w) } +/// Shared data between all facets (the [`Writer`] and all [readers](`Store`) of a store) +/// +/// This should never be exposed outside of this module. +#[derive(Derivative)] +#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"))] +struct Inner { + cache: RwLock, Arc>>, + ready_rx: DelayedInit<()>, +} + #[cfg(test)] mod tests { use super::{store, Writer}; From 4cda04ceb8decdadae225461942db012e1f085f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 13 Mar 2024 02:28:11 +0100 Subject: [PATCH 2/5] Implement backpressuring broadcast channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/utils/broadcast.rs | 153 ++++++++++++++++++++++++++++ kube-runtime/src/utils/mod.rs | 1 + 2 files changed, 154 insertions(+) create mode 100644 kube-runtime/src/utils/broadcast.rs diff --git a/kube-runtime/src/utils/broadcast.rs b/kube-runtime/src/utils/broadcast.rs new file mode 100644 index 000000000..b5b7dd237 --- /dev/null +++ b/kube-runtime/src/utils/broadcast.rs @@ -0,0 +1,153 @@ +use ahash::HashMap; +use futures::{channel::mpsc, stream, SinkExt, Stream, StreamExt}; + +#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +struct SubscriberId(u64); + +/// A channel that broadcasts values to multiple subscribers. +/// +/// Compared to [`tokio::sync::broadcast`], this backpressures the sender when the buffer is full, +/// rather than dropping values. +pub(crate) struct Broadcaster { + subscriber_txes: HashMap>, + next_id: SubscriberId, + buffer_size: usize, +} + +impl Broadcaster +where + T: Clone + Send + Sync, +{ + pub fn new(buffer_size: usize) -> Self { + Self { + subscriber_txes: HashMap::default(), + next_id: SubscriberId(0), + buffer_size, + } + } + + /// Sends `value` to all subscribers. + /// + /// The future may wait if any subscriber does not have room to buffer the value. + /// + /// Closed subscribers will be unsubscribed. + pub async fn send(&mut self, value: T) { + let closed_subscribers = stream::iter( + // TODO: This should be able to be a &mut, but that causes a weird lifetime error that somehow gets linked to the call site + self.subscriber_txes.clone(), + ) + .flat_map_unordered(None, |(sub_id, mut tx)| { + let value = value.clone(); + stream::once(async move { + match tx.send(value).await { + // Subscriber is still open + Ok(()) => None, + // Subscriber is closed, schedule for unsubscribing + Err(_) => Some(sub_id), + } + }) + .boxed() + }) + .filter_map(|x: Option| async move { x }) + .collect::>() + .await; + for closed_sub in closed_subscribers { + self.subscriber_txes.remove(&closed_sub); + } + } + + pub fn subscribe(&mut self) -> impl Stream + Send + Sync { + // Currently we allocate a buffer per subscriber, but it is configured over the whole stream + // in order to give room to move to a shared buffer implementation later on. + let (tx, rx) = mpsc::channel(self.buffer_size); + let id = self.next_id; + self.next_id = SubscriberId(id.0 + 1); + self.subscriber_txes.insert(id, tx); + rx + } +} + +#[cfg(test)] +mod tests { + use std::{pin::pin, time::Duration}; + + use futures::{future, poll, FutureExt, StreamExt}; + use tokio::time::timeout; + + use super::Broadcaster; + + #[tokio::test] + async fn test_regular_usage() { + let mut broadcaster = Broadcaster::::new(1); + let sent = (0..20).collect::>(); + let subscribers = (0..10) + .map(|_| { + broadcaster + .subscribe() + .collect::>() + .map(|received| assert_eq!(sent, received)) + .boxed() + }) + .collect::>(); + let producer = async { + for i in &sent { + broadcaster.send(*i).await; + } + drop(broadcaster); + } + .boxed(); + timeout( + Duration::from_secs(1), + future::join_all(subscribers.into_iter().chain([producer])), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_backpressure() { + timeout(Duration::from_secs(1), async { + let mut broadcaster = Broadcaster::::new(1); + let mut sub_1 = broadcaster.subscribe(); + let mut sub_2 = broadcaster.subscribe(); + + broadcaster.send(1).await; + assert_eq!(Some(1), sub_1.next().await); + // No read on sub_2 + + let mut send_2 = pin!(broadcaster.send(2)); + // send_2 will be pending due to 1 not having been read yet from sub_2 (so sub_2's buffer is full) + assert!(poll!(send_2.as_mut()).is_pending()); + // sub_1 has buffer space, so it will see 2 immediately + assert_eq!(Some(2), sub_1.next().await); + // read 1 from sub_2 to make room + assert_eq!(Some(1), sub_2.next().await); + // send_2 should now be able to complete since sub_2 has room + send_2.await; + assert_eq!(Some(2), sub_2.next().await); + }) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_unsubscribe() { + let mut broadcaster = Broadcaster::::new(1); + let sub_1 = broadcaster.subscribe(); + let sub_2 = broadcaster.subscribe(); + + let subscriber = sub_1.collect::>(); + let producer = async { + for i in 0..10 { + broadcaster.send(i).await + } + drop(broadcaster); + }; + + // unsubscribe sub_2 + drop(sub_2); + timeout(Duration::from_secs(1), future::join(subscriber, producer)) + .await + .unwrap(); + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index bdf85b227..3a025eadd 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -1,6 +1,7 @@ //! Helpers for manipulating built-in streams mod backoff_reset_timer; +pub(crate) mod broadcast; pub(crate) mod delayed_init; mod event_flatten; mod event_modify; From d9563e7ac31a9e1fee535367ca7212509974b6f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 13 Mar 2024 03:45:01 +0100 Subject: [PATCH 3/5] Add broadcast event support to reflector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/reflector/mod.rs | 18 +++- kube-runtime/src/reflector/object_ref.rs | 2 +- kube-runtime/src/reflector/store.rs | 93 ++++++++++++++------ kube-runtime/src/utils/broadcast.rs | 1 + kube-runtime/src/utils/mod.rs | 2 - kube-runtime/src/utils/reflect.rs | 104 ----------------------- kube-runtime/src/utils/watch_ext.rs | 7 +- 7 files changed, 87 insertions(+), 140 deletions(-) delete mode 100644 kube-runtime/src/utils/reflect.rs diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 04403202d..ea264ba9e 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -5,8 +5,8 @@ pub mod store; pub use self::object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef}; use crate::watcher; -use futures::{Stream, TryStreamExt}; -use std::hash::Hash; +use futures::{lock::Mutex, Stream, StreamExt, TryStreamExt}; +use std::{hash::Hash, sync::Arc}; pub use store::{store, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] @@ -88,13 +88,23 @@ pub use store::{store, Store}; /// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store. /// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on. /// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`. -pub fn reflector(mut writer: store::Writer, stream: W) -> impl Stream +pub fn reflector(writer: store::Writer, stream: W) -> impl Stream where K: Lookup + Clone, K::DynamicType: Eq + Hash + Clone, W: Stream>>, { - stream.inspect_ok(move |event| writer.apply_watcher_event(event)) + // TODO: Writer should be able to be owned by the closure, but I can't get the lifetimes to line up + // also not sure if we can get rid of the mutex... + let writer = Arc::new(Mutex::new(writer)); + stream.and_then(move |event| { + let writer = writer.clone(); + async move { + let mut writer = writer.lock().await; + writer.apply_watcher_event(&event).await; + Ok(event) + } + }) } #[cfg(test)] diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index e89094fd4..c5a97a711 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -16,7 +16,7 @@ use std::{ pub trait Lookup { /// Type information for types that do not know their resource information at compile time. /// This is equivalent to [`Resource::DynamicType`]. - type DynamicType; + type DynamicType: Send + Sync; /// The [kind](Resource::kind) for this object. fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str>; diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 14b53063c..b6e498ff2 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -1,10 +1,14 @@ use super::{Lookup, ObjectRef}; use crate::{ - utils::delayed_init::{self, DelayedInit}, + utils::{ + broadcast::Broadcaster, + delayed_init::{self, DelayedInit}, + }, watcher, }; -use ahash::AHashMap; +use ahash::{AHashMap, HashSet}; use derivative::Derivative; +use futures::Stream; use parking_lot::RwLock; use std::{fmt::Debug, hash::Hash, sync::Arc}; use thiserror::Error; @@ -22,6 +26,7 @@ where store: Arc>, dyntype: K::DynamicType, ready_tx: Option>, + touched_objects_broadcaster: Broadcaster>>, } impl Writer @@ -41,6 +46,7 @@ where }), dyntype, ready_tx: Some(ready_tx), + touched_objects_broadcaster: Broadcaster::new(1), } } @@ -55,24 +61,51 @@ where } } + /// Returns a [`Stream`] of objects that have been touched in any way (created/modified/deleted). + /// + /// Note that delays in handling objects may affect all subscribers, as well as the backing reflector (or other source). + pub fn subscribe_touched_objects(&mut self) -> impl Stream>> { + self.touched_objects_broadcaster.subscribe() + } + /// Applies a single watcher event to the store - pub fn apply_watcher_event(&mut self, event: &watcher::Event) { + pub async fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { watcher::Event::Applied(obj) => { - let key = obj.to_object_ref(self.dyntype.clone()); + let key = Arc::new(obj.to_object_ref(self.dyntype.clone())); let obj = Arc::new(obj.clone()); - self.store.cache.write().insert(key, obj); + self.store.cache.write().insert(key.clone(), obj); + self.touched_objects_broadcaster.send(key).await; } watcher::Event::Deleted(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); - self.store.cache.write().remove(&key); + let removed_obj = self.store.cache.write().remove_entry(&key); + if let Some((key, _)) = removed_obj { + self.touched_objects_broadcaster.send(key).await; + } } watcher::Event::Restarted(new_objs) => { - let new_objs = new_objs + let mut new_objs = new_objs .iter() - .map(|obj| (obj.to_object_ref(self.dyntype.clone()), Arc::new(obj.clone()))) + .map(|obj| { + ( + Arc::new(obj.to_object_ref(self.dyntype.clone())), + Arc::new(obj.clone()), + ) + }) .collect::>(); - *self.store.cache.write() = new_objs; + let mutated_obj_refs = { + let mut cache = self.store.cache.write(); + std::mem::swap(&mut *cache, &mut new_objs); + let old_objs = new_objs; + old_objs + .into_keys() + .chain(cache.keys().cloned()) + .collect::>() + }; + for key in mutated_obj_refs { + self.touched_objects_broadcaster.send(key).await; + } } } @@ -208,7 +241,7 @@ where #[derive(Derivative)] #[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"))] struct Inner { - cache: RwLock, Arc>>, + cache: RwLock>, Arc>>, ready_rx: DelayedInit<()>, } @@ -219,8 +252,8 @@ mod tests { use k8s_openapi::api::core::v1::ConfigMap; use kube_client::api::ObjectMeta; - #[test] - fn should_allow_getting_namespaced_object_by_namespaced_ref() { + #[tokio::test] + async fn should_allow_getting_namespaced_object_by_namespaced_ref() { let cm = ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), @@ -230,13 +263,15 @@ mod tests { ..ConfigMap::default() }; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w + .apply_watcher_event(&watcher::Event::Applied(cm.clone())) + .await; let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } - #[test] - fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() { + #[tokio::test] + async fn should_not_allow_getting_namespaced_object_by_clusterscoped_ref() { let cm = ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), @@ -248,13 +283,13 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm)); + store_w.apply_watcher_event(&watcher::Event::Applied(cm)).await; let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } - #[test] - fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() { + #[tokio::test] + async fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() { let cm = ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), @@ -264,12 +299,14 @@ mod tests { ..ConfigMap::default() }; let (store, mut writer) = store(); - writer.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + writer + .apply_watcher_event(&watcher::Event::Applied(cm.clone())) + .await; assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); } - #[test] - fn should_allow_getting_clusterscoped_object_by_namespaced_ref() { + #[tokio::test] + async fn should_allow_getting_clusterscoped_object_by_namespaced_ref() { let cm = ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), @@ -282,13 +319,15 @@ mod tests { let mut nsed_cm = cm.clone(); nsed_cm.metadata.namespace = Some("ns".to_string()); let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w + .apply_watcher_event(&watcher::Event::Applied(cm.clone())) + .await; let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm)); } - #[test] - fn find_element_in_store() { + #[tokio::test] + async fn find_element_in_store() { let cm = ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), @@ -301,14 +340,16 @@ mod tests { let (reader, mut writer) = store::(); assert!(reader.is_empty()); - writer.apply_watcher_event(&watcher::Event::Applied(cm)); + writer.apply_watcher_event(&watcher::Event::Applied(cm)).await; assert_eq!(reader.len(), 1); assert!(reader.find(|k| k.metadata.generation == Some(1234)).is_none()); target_cm.metadata.name = Some("obj1".to_string()); target_cm.metadata.generation = Some(1234); - writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone())); + writer + .apply_watcher_event(&watcher::Event::Applied(target_cm.clone())) + .await; assert!(!reader.is_empty()); assert_eq!(reader.len(), 2); let found = reader.find(|k| k.metadata.generation == Some(1234)); diff --git a/kube-runtime/src/utils/broadcast.rs b/kube-runtime/src/utils/broadcast.rs index b5b7dd237..1d6c69839 100644 --- a/kube-runtime/src/utils/broadcast.rs +++ b/kube-runtime/src/utils/broadcast.rs @@ -8,6 +8,7 @@ struct SubscriberId(u64); /// /// Compared to [`tokio::sync::broadcast`], this backpressures the sender when the buffer is full, /// rather than dropping values. +#[derive(Debug)] pub(crate) struct Broadcaster { subscriber_txes: HashMap>, next_id: SubscriberId, diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 3a025eadd..83628f982 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -6,7 +6,6 @@ pub(crate) mod delayed_init; mod event_flatten; mod event_modify; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; -mod reflect; mod stream_backoff; #[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; mod watch_ext; @@ -16,7 +15,6 @@ pub use event_flatten::EventFlatten; pub use event_modify::EventModify; #[cfg(feature = "unstable-runtime-predicates")] pub use predicate::{predicates, Predicate, PredicateFilter}; -pub use reflect::Reflect; pub use stream_backoff::StreamBackoff; #[cfg(feature = "unstable-runtime-subscribe")] pub use stream_subscribe::StreamSubscribe; diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs deleted file mode 100644 index 43fa65c2a..000000000 --- a/kube-runtime/src/utils/reflect.rs +++ /dev/null @@ -1,104 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; - -use futures::{Stream, TryStream}; -use pin_project::pin_project; - -use crate::{ - reflector::store::Writer, - watcher::{Error, Event}, -}; -use kube_client::Resource; - -/// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method -#[pin_project] -pub struct Reflect -where - K: Resource + Clone + 'static, - K::DynamicType: Eq + std::hash::Hash + Clone, -{ - #[pin] - stream: St, - writer: Writer, -} - -impl Reflect -where - St: TryStream>, - K: Resource + Clone, - K::DynamicType: Eq + std::hash::Hash + Clone, -{ - pub(super) fn new(stream: St, writer: Writer) -> Reflect { - Self { stream, writer } - } -} - -impl Stream for Reflect -where - K: Resource + Clone, - K::DynamicType: Eq + std::hash::Hash + Clone, - St: Stream, Error>>, -{ - type Item = Result, Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut me = self.project(); - me.stream.as_mut().poll_next(cx).map_ok(move |event| { - me.writer.apply_watcher_event(&event); - event - }) - } -} - -#[cfg(test)] -pub(crate) mod test { - use std::{task::Poll, vec}; - - use super::{Error, Event, Reflect}; - use crate::reflector; - use futures::{pin_mut, poll, stream, StreamExt}; - use k8s_openapi::api::core::v1::Pod; - - fn testpod(name: &str) -> Pod { - let mut pod = Pod::default(); - pod.metadata.name = Some(name.to_string()); - pod - } - - #[tokio::test] - async fn reflect_passes_events_through() { - let foo = testpod("foo"); - let bar = testpod("bar"); - let st = stream::iter([ - Ok(Event::Applied(foo.clone())), - Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![foo, bar])), - ]); - let (reader, writer) = reflector::store(); - - let reflect = Reflect::new(st, writer); - pin_mut!(reflect); - assert_eq!(reader.len(), 0); - - assert!(matches!( - poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::Applied(_)))) - )); - assert_eq!(reader.len(), 1); - - assert!(matches!( - poll!(reflect.next()), - Poll::Ready(Some(Err(Error::TooManyObjects))) - )); - assert_eq!(reader.len(), 1); - - let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_)))))); - assert_eq!(reader.len(), 2); - - assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); - assert_eq!(reader.len(), 2); - } -} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 6f9994586..6a375f3f1 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -3,12 +3,13 @@ use crate::utils::predicate::{Predicate, PredicateFilter}; #[cfg(feature = "unstable-runtime-subscribe")] use crate::utils::stream_subscribe::StreamSubscribe; use crate::{ + reflector, utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff}, watcher, }; use kube_client::Resource; -use crate::{reflector::store::Writer, utils::Reflect}; +use crate::reflector::store::Writer; use crate::watcher::DefaultBackoff; use backoff::backoff::Backoff; @@ -239,13 +240,13 @@ pub trait WatchStreamExt: Stream { /// ``` /// /// [`Store`]: crate::reflector::Store - fn reflect(self, writer: Writer) -> Reflect + fn reflect(self, writer: Writer) -> impl Stream where Self: Stream>> + Sized, K: Resource + Clone + 'static, K::DynamicType: Eq + std::hash::Hash + Clone, { - Reflect::new(self, writer) + reflector(writer, self) } } From acae77752d266e660660053fd034d1f1e317bc9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 13 Mar 2024 04:05:29 +0100 Subject: [PATCH 4/5] Simplify reflector() with async-stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/Cargo.toml | 1 + kube-runtime/src/reflector/mod.rs | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 907de19dc..9d6f65085 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -44,6 +44,7 @@ thiserror = "1.0.29" backoff = "0.4.0" async-trait = "0.1.64" hashbrown = "0.14.0" +async-stream = "0.3.5" [dependencies.k8s-openapi] version = "0.21.0" diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index ea264ba9e..36d049c52 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -5,8 +5,9 @@ pub mod store; pub use self::object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef}; use crate::watcher; -use futures::{lock::Mutex, Stream, StreamExt, TryStreamExt}; -use std::{hash::Hash, sync::Arc}; +use async_stream::stream; +use futures::{Stream, StreamExt}; +use std::hash::Hash; pub use store::{store, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] @@ -88,23 +89,22 @@ pub use store::{store, Store}; /// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store. /// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on. /// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`. -pub fn reflector(writer: store::Writer, stream: W) -> impl Stream +pub fn reflector(mut writer: store::Writer, stream: W) -> impl Stream where K: Lookup + Clone, K::DynamicType: Eq + Hash + Clone, W: Stream>>, { - // TODO: Writer should be able to be owned by the closure, but I can't get the lifetimes to line up - // also not sure if we can get rid of the mutex... - let writer = Arc::new(Mutex::new(writer)); - stream.and_then(move |event| { - let writer = writer.clone(); - async move { - let mut writer = writer.lock().await; - writer.apply_watcher_event(&event).await; - Ok(event) + // TODO: why does pin! not work? not sure... + let mut stream = Box::pin(stream); + stream! { + while let Some(event) = stream.next().await { + if let Ok(event) = &event { + writer.apply_watcher_event(event).await; + } + yield event; } - }) + } } #[cfg(test)] From 4525d678cda0b114e32b2e018b08dad6e54d81e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Natalie=20Klestrup=20R=C3=B6ijezon?= Date: Wed, 13 Mar 2024 04:08:55 +0100 Subject: [PATCH 5/5] Remove the Send+Sync requirement from Broadcaster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Natalie Klestrup Röijezon --- kube-runtime/src/reflector/object_ref.rs | 2 +- kube-runtime/src/utils/broadcast.rs | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/kube-runtime/src/reflector/object_ref.rs b/kube-runtime/src/reflector/object_ref.rs index c5a97a711..e89094fd4 100644 --- a/kube-runtime/src/reflector/object_ref.rs +++ b/kube-runtime/src/reflector/object_ref.rs @@ -16,7 +16,7 @@ use std::{ pub trait Lookup { /// Type information for types that do not know their resource information at compile time. /// This is equivalent to [`Resource::DynamicType`]. - type DynamicType: Send + Sync; + type DynamicType; /// The [kind](Resource::kind) for this object. fn kind(dyntype: &Self::DynamicType) -> Cow<'_, str>; diff --git a/kube-runtime/src/utils/broadcast.rs b/kube-runtime/src/utils/broadcast.rs index 1d6c69839..ed0a520d5 100644 --- a/kube-runtime/src/utils/broadcast.rs +++ b/kube-runtime/src/utils/broadcast.rs @@ -17,7 +17,7 @@ pub(crate) struct Broadcaster { impl Broadcaster where - T: Clone + Send + Sync, + T: Clone, { pub fn new(buffer_size: usize) -> Self { Self { @@ -39,15 +39,14 @@ where ) .flat_map_unordered(None, |(sub_id, mut tx)| { let value = value.clone(); - stream::once(async move { + Box::pin(stream::once(async move { match tx.send(value).await { // Subscriber is still open Ok(()) => None, // Subscriber is closed, schedule for unsubscribing Err(_) => Some(sub_id), } - }) - .boxed() + })) }) .filter_map(|x: Option| async move { x }) .collect::>() @@ -57,7 +56,7 @@ where } } - pub fn subscribe(&mut self) -> impl Stream + Send + Sync { + pub fn subscribe(&mut self) -> impl Stream { // Currently we allocate a buffer per subscriber, but it is configured over the whole stream // in order to give room to move to a shared buffer implementation later on. let (tx, rx) = mpsc::channel(self.buffer_size);