Skip to content

Commit

Permalink
feat: add method to create RecordBatch in SysEventStore (#25610)
Browse files Browse the repository at this point in the history
- This commit allows `RecordBatch` to be created directly from event
  store. It means we can avoid cloning events and avoids creating
  intermediate vec. To achieve that, there's a new method
  `as_record_batch` that's been added with a trait bound `ToRecordBatch`
  that events are expected to implement.
- Minor tidy ups (renaming methods) and added test

closes: #25609
  • Loading branch information
praveen-influx authored Dec 3, 2024
1 parent 43755c2 commit f23aa0d
Showing 1 changed file with 165 additions and 24 deletions.
189 changes: 165 additions & 24 deletions influxdb3_sys_events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,33 @@ use std::{
sync::Arc,
};

use arrow::{datatypes::Schema, error::ArrowError};
use arrow_array::RecordBatch;
use dashmap::DashMap;
use iox_time::TimeProvider;

const MAX_CAPACITY: usize = 1000;

/// This trait is not dyn compatible
pub trait ToRecordBatch<E> {
/// set the schema for the event
fn schema() -> Schema;
/// takes reference to `RingBuffer` and creates `RecordBatch` for the events
/// in the buffer.
fn to_record_batch(
items: Option<&RingBuffer<Event<E>>>,
) -> Option<Result<RecordBatch, ArrowError>>;
}

/// This store captures the events for different types of instrumentation.
/// It is backed by a ring buffer per event type. Every new event type that
/// is added can call [`SysEventStore::add`] directly. And in order to find
/// all the events per event type [`SysEventStore::query`] method can be used.
/// can be added by calling [`SysEventStore::record`]. And in order to find
/// all the events per event type [`SysEventStore::as_vec`] method can
/// be used. This returns a `Vec<Event<E>>` which internally clones to get
/// values out of the `Ref` guard. There is a convenient method,
/// [`SysEventStore::as_record_batch`] in order to get a record batch directly
/// avoiding clones.
///
/// Every time a new event is introduced, the system table had to be setup
/// following the same pattern as in `influxdb3_server::system_tables`
#[derive(Debug)]
Expand All @@ -30,7 +48,8 @@ impl SysEventStore {
}
}

pub fn add<E>(&self, val: E)
/// records an event by adding it to this event store
pub fn record<E>(&self, val: E)
where
E: 'static + Debug + Sync + Send,
{
Expand All @@ -43,45 +62,65 @@ impl SysEventStore {
.entry(TypeId::of::<RingBuffer<Event<E>>>())
.or_insert_with(|| Box::new(RingBuffer::<Event<E>>::new(MAX_CAPACITY)));

// unwrap here is fine, we just used the same type above for
// get or insert
buf.downcast_mut::<RingBuffer<Event<E>>>()
.unwrap()
.push(wrapped);
}

pub fn query<E>(&self) -> Vec<Event<E>>
/// Creates an intermediate `Vec` by cloning events. To
/// create a record batch instead use [`Self::as_record_batch`]
pub fn as_vec<E>(&self) -> Vec<Event<E>>
where
E: 'static + Clone + Debug + Sync + Send,
{
let mut vec = vec![];
if let Some(buf) = self.events.get(&TypeId::of::<RingBuffer<Event<E>>>()) {
let iter = buf
.downcast_ref::<RingBuffer<Event<E>>>()
.unwrap()
.in_order();
for i in iter {
vec.push(i.clone());
}
};
vec
self.events
.get(&TypeId::of::<RingBuffer<Event<E>>>())
.map(|buf| {
// unwrap here is fine, we just used the same type above to
// get
buf.downcast_ref::<RingBuffer<Event<E>>>()
.unwrap()
.in_order()
.cloned()
.collect()
})
.unwrap_or_default()
}

/// Creates record batch for given event type `E`, this avoids
/// any unnecessary allocation but events need to implement
/// [`ToRecordBatch`] trait
pub fn as_record_batch<E>(&self) -> Option<Result<RecordBatch, ArrowError>>
where
E: 'static + Clone + Debug + Sync + Send + ToRecordBatch<E>,
{
let map_ref = self.events.get(&TypeId::of::<RingBuffer<Event<E>>>());
let buf_ref = map_ref
.as_ref()
// unwrap here is fine, we just used the same type above to get
.map(|buf| buf.downcast_ref::<RingBuffer<Event<E>>>().unwrap());
E::to_record_batch(buf_ref)
}
}

struct RingBuffer<T> {
pub struct RingBuffer<T> {
buf: Vec<T>,
max: usize,
write_index: usize,
}

impl<T> RingBuffer<T> {
pub fn new(capacity: usize) -> Self {
fn new(capacity: usize) -> Self {
Self {
buf: Vec::with_capacity(capacity),
max: capacity,
write_index: 0,
}
}

pub fn push(&mut self, val: T) {
fn push(&mut self, val: T) {
if !self.reached_max() {
self.buf.push(val);
} else {
Expand Down Expand Up @@ -118,10 +157,16 @@ impl<D> Event<D> {
mod tests {
use std::sync::Arc;

use arrow::{
array::{StringViewBuilder, StructBuilder, UInt64Builder},
datatypes::{DataType, Field, Fields, Schema},
error::ArrowError,
};
use arrow_array::{ArrayRef, RecordBatch};
use iox_time::{MockProvider, Time};
use observability_deps::tracing::debug;

use crate::{RingBuffer, SysEventStore};
use crate::{Event, RingBuffer, SysEventStore, ToRecordBatch, MAX_CAPACITY};

#[allow(dead_code)]
#[derive(Default, Clone, Debug)]
Expand All @@ -132,6 +177,57 @@ mod tests {
pub random_name: String,
}

impl ToRecordBatch<SampleEvent1> for SampleEvent1 {
fn to_record_batch(
items: Option<&RingBuffer<Event<SampleEvent1>>>,
) -> Option<Result<RecordBatch, ArrowError>> {
items.map(|buf| {
let iter = buf.in_order();
let mut event_time_arr = StringViewBuilder::with_capacity(MAX_CAPACITY);
let mut struct_builder = StructBuilder::from_fields(
vec![
Field::new("time_taken", DataType::UInt64, false),
Field::new("total_fetched", DataType::UInt64, false),
],
MAX_CAPACITY,
);
for event in iter {
event_time_arr.append_value("2024-12-01T23:59:59.000Z");
let time_taken_builder =
struct_builder.field_builder::<UInt64Builder>(0).unwrap();
time_taken_builder.append_value(event.data.time_taken);

let num_files_fetched_builder =
struct_builder.field_builder::<UInt64Builder>(1).unwrap();
num_files_fetched_builder.append_value(event.data.total_fetched);

struct_builder.append(true);
}

let columns: Vec<ArrayRef> = vec![
Arc::new(event_time_arr.finish()),
Arc::new(struct_builder.finish()),
];
RecordBatch::try_new(Arc::new(Self::schema()), columns)
})
}

fn schema() -> Schema {
let columns = vec![
Field::new("event_time", DataType::Utf8View, false),
Field::new(
"event_data",
DataType::Struct(Fields::from(vec![
Field::new("time_taken", DataType::UInt64, false),
Field::new("total_fetched", DataType::UInt64, false),
])),
false,
),
];
Schema::new(columns)
}
}

#[allow(dead_code)]
#[derive(Default, Clone, Debug)]
struct SampleEvent2 {
Expand Down Expand Up @@ -208,18 +304,63 @@ mod tests {
let time_provider = MockProvider::new(Time::from_timestamp_nanos(100));

let event_store = SysEventStore::new(Arc::new(time_provider));
event_store.add(event_data);
event_store.record(event_data);

event_store.add(event_data2);
event_store.add(event_data3);
event_store.record(event_data2);
event_store.record(event_data3);
assert_eq!(2, event_store.events.len());

let all_events = event_store.query::<SampleEvent1>();
let all_events = event_store.as_vec::<SampleEvent1>();
assert_eq!(2, all_events.len());
debug!(all_events = ?all_events, "all events in sys events for type SampleEvent1");

let all_events = event_store.query::<SampleEvent2>();
let all_events = event_store.as_vec::<SampleEvent2>();
assert_eq!(1, all_events.len());
debug!(all_events = ?all_events, "all events in sys events for type SampleEvent2");
}

#[test_log::test(test)]
fn test_event_store_2() {
let event_data = SampleEvent1 {
start_time: 0,
time_taken: 10,
total_fetched: 10,
random_name: "foo".to_owned(),
};

let event_data2 = SampleEvent2 {
start_time: 0,
time_taken: 10,
generation_id: 100,
};

let event_data3 = SampleEvent1 {
start_time: 0,
time_taken: 10,
total_fetched: 10,
random_name: "boo".to_owned(),
};

let time_provider = MockProvider::new(Time::from_timestamp_nanos(100));

let event_store = SysEventStore::new(Arc::new(time_provider));
event_store.record(event_data);

event_store.record(event_data2);
event_store.record(event_data3);
assert_eq!(2, event_store.events.len());

let all_events = event_store.as_record_batch::<SampleEvent1>();
assert_eq!(
2,
all_events
.as_ref()
.unwrap()
.as_ref()
.unwrap()
.columns()
.len()
);
debug!(all_events = ?all_events, "all SampleEvent1 events as record batch");
}
}

0 comments on commit f23aa0d

Please sign in to comment.