Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read event data in parallel to backtest #124

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hftbacktest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hmac = { version = "0.13.0-pre.3", optional = true }
rand = { version = "0.8.5", optional = true }
uuid = { version = "1.8.0", features = ["v4"], optional = true }
nom = { version = "7.1.3", optional = true }
bus = { version = "2.4" }
Copy link
Contributor

@bohblue2 bohblue2 Aug 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: bus sometimes busy-waits in the current implementation, which may cause increased CPU usage — see jonhoo/bus#23.

. There is at least one case where the readers race with the writer and may not successfully wake it up, so the writer has to park with a timeout. I would love to get rid of this, but haven't had a chance to dig into it, and no longer use this library actively myself. If you want to take a look, I'd be happy to help out!

It probably won't cause any problems (because this happens in SPMC and we're an SPSC structure), but I think I'll put up some docs anyway.

hftbacktest-derive = { path = "../hftbacktest-derive", optional = true, version = "0.1.0" }

[dev-dependencies]
Expand Down
120 changes: 120 additions & 0 deletions hftbacktest/src/backtest/data/bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std::{fs::File, io, io::ErrorKind, iter::Peekable, num::NonZeroUsize};

use bus::{Bus, BusIntoIter, BusReader};
use tracing::{error, info, info_span};
use zip::ZipArchive;

use crate::{
backtest::{
data::{npy::NpyReader, read_npy_file, read_npz_file, Data, NpyDTyped},
BacktestError,
},
types::Event,
};

#[derive(Copy, Clone)]
pub enum EventBusMessage<EventT: Clone> {
Item(EventT),
EndOfData,
}

pub struct EventBusReader<EventT: Clone + Send + Sync> {
reader: Peekable<BusIntoIter<EventBusMessage<EventT>>>,
}

impl<EventT: Clone + Send + Sync> EventBusReader<EventT> {
pub fn new(reader: BusReader<EventBusMessage<EventT>>) -> Self {
Self {
reader: reader.into_iter().peekable(),
}
}

pub fn peek(&mut self) -> Option<&EventT> {
self.reader.peek().and_then(|ev| match ev {
EventBusMessage::Item(item) => Some(item),
EventBusMessage::EndOfData => None,
})
}

pub fn next(&mut self) -> Option<EventT> {
self.reader.next().and_then(|ev| match ev {
EventBusMessage::Item(item) => Some(item),
EventBusMessage::EndOfData => None,
})
}
}

pub trait TimestampedEventQueue<EventT> {
fn next_event(&mut self) -> Option<EventT>;

fn peek_event(&mut self) -> Option<&EventT>;

fn event_time(value: &EventT) -> i64;
}

pub trait EventConsumer<EventT> {
fn is_event_relevant(event: &EventT) -> bool;

fn process_event(&mut self, event: EventT) -> Result<(), BacktestError>;
}

fn load_data<EventT: NpyDTyped + Clone + Send>(
filepath: String,
) -> Result<Data<EventT>, BacktestError> {
let data = if filepath.ends_with(".npy") {
read_npy_file(&filepath)?
} else if filepath.ends_with(".npz") {
read_npz_file(&filepath, "data")?
} else {
return Err(BacktestError::DataError(io::Error::new(
ErrorKind::InvalidData,
"unsupported data type",
)));
};

Ok(data)
}

#[tracing::instrument(skip(bus))]
pub fn replay_event_file<EventT: NpyDTyped + Clone + Send + 'static>(
path: String,
bus: &mut Bus<EventBusMessage<EventT>>,
) -> std::io::Result<()> {
if !path.ends_with(".npz") {
todo!("Only .npz is supported in this branch")
}

let mut archive = ZipArchive::new(File::open(path)?)?;
let mut reader = NpyReader::<_, EventT>::new(
archive.by_name("data.npy")?,
NonZeroUsize::new(512).unwrap(),
)?;

loop {
let read = reader.read(|event| {
bus.broadcast(EventBusMessage::Item(event.clone()));
})?;

// EOF
if read == 0 {
break;
}
}

Ok(())
}

#[tracing::instrument(skip_all)]
pub fn replay_events_to_bus<EventT: NpyDTyped + Clone + Send + 'static>(
mut bus: Bus<EventBusMessage<EventT>>,
mut sources: Vec<String>,
) {
for source in sources.drain(..) {
let source_load_span = info_span!("load_data", source = &source);
let _source_load_span = source_load_span.entered();

replay_event_file(source, &mut bus).unwrap();
}

bus.broadcast(EventBusMessage::EndOfData);
}
4 changes: 4 additions & 0 deletions hftbacktest/src/backtest/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod bus;
mod npy;
mod reader;

Expand All @@ -10,6 +11,7 @@ use std::{
slice::SliceIndex,
};

pub use bus::{replay_events_to_bus, EventBusMessage, EventBusReader, EventConsumer, TimestampedEventQueue};
pub use npy::{read_npy_file, read_npz_file, write_npy, Field, NpyDTyped, NpyHeader};
pub use reader::{Cache, DataSource, Reader};

Expand Down Expand Up @@ -107,6 +109,8 @@ where
}
}

unsafe impl Send for DataPtr {}

#[derive(Debug)]
pub struct DataPtr {
ptr: *mut [u8],
Expand Down
141 changes: 141 additions & 0 deletions hftbacktest/src/backtest/data/npy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{
alloc::{alloc, dealloc, Layout},
fs::File,
io::{Error, ErrorKind, Read, Write},
marker::PhantomData,
num::NonZeroUsize,
};

use crate::backtest::data::{npy::parser::Value, Data, DataPtr, POD};
Expand Down Expand Up @@ -162,6 +165,144 @@ fn check_field_consistency(
Ok(discrepancies)
}

pub struct NpyReader<R: Read, T: NpyDTyped> {
reader: R,

/// Input buffer aligned to [T].
buffer: *mut u8,

/// Current buffer position in bytes.
buffer_pos: usize,

/// Number of bytes available in the buffer for reading.
buffer_filled: usize,

/// Maximum number of bytes the buffer of this reader can hold.
buffer_capacity: usize,

phantom_data: PhantomData<T>,

layout: Layout,
}

impl<R: Read, T: NpyDTyped> Drop for NpyReader<R, T> {
fn drop(&mut self) {
unsafe {
dealloc(
self.buffer,
self.layout,
)
}
}
}

impl<R: Read, T: NpyDTyped> NpyReader<R, T> {
pub fn new(mut reader: R, buffer_size: NonZeroUsize) -> std::io::Result<Self> {
let header = read_npy_header::<R, T>(&mut reader)?;

if T::descr() != header.descr {
match check_field_consistency(&T::descr(), &header.descr) {
Ok(diff) => {
println!("Warning: Field name mismatch - {:?}", diff);
}
Err(err) => {
return Err(Error::new(ErrorKind::InvalidData, err));
}
}
}

let layout = Layout::array::<T>(buffer_size.get()).map_err(|_| Error::other("Buffer size is too large"))?;
let buffer_capacity = buffer_size.get() * size_of::<T>();
let buffer = unsafe { alloc(layout) };

if buffer.is_null() {
return Err(std::io::Error::new(ErrorKind::OutOfMemory, "unable to allocate buffer"))
}

Ok(Self {
buffer,
buffer_pos: 0,
buffer_filled: 0,
buffer_capacity,
layout,
reader,
phantom_data: Default::default(),
})
}

pub fn read(&mut self, mut collector: impl FnMut(&T)) -> std::io::Result<usize> {
if self.buffer_pos == self.buffer_capacity {
self.buffer_pos = 0;
self.buffer_filled = 0;
}

let io_buf = unsafe { std::slice::from_raw_parts_mut(self.buffer, self.buffer_capacity) };
let io_buf_cursor = &mut io_buf[self.buffer_pos..];

let io_buf_unconsumed = self.buffer_filled - self.buffer_pos;
let bytes_read = self.reader.read(&mut io_buf_cursor[io_buf_unconsumed..])?;
let items_read = (io_buf_unconsumed + bytes_read) / size_of::<T>();
let io_buf_consumed = items_read * size_of::<T>();

let item_buf: &[T] = unsafe {
std::slice::from_raw_parts(
self.buffer.offset(self.buffer_pos as isize).cast(),
items_read,
)
};

for item in item_buf {
collector(item);
}

self.buffer_filled += bytes_read;
self.buffer_pos += io_buf_consumed;

Ok(items_read)
}
}

pub fn read_npy_header<R: Read, D: NpyDTyped>(reader: &mut R) -> std::io::Result<NpyHeader> {
let mut buf = Vec::with_capacity(10);
let mut magic = reader.take(10);
magic.read_to_end(&mut buf)?;

if buf[0..6].to_vec() != b"\x93NUMPY" {
return Err(Error::new(
ErrorKind::InvalidData,
"must start with \\x93NUMPY",
));
}
if buf[6..8].to_vec() != b"\x01\x00" {
return Err(Error::new(
ErrorKind::InvalidData,
"support only version 1.0",
));
}

let header_len = u16::from_le_bytes(buf[8..10].try_into().unwrap()) as usize;
let reader = magic.into_inner();

reader.take(header_len as u64).read_to_end(&mut buf)?;

let header = String::from_utf8(buf[10..(10 + header_len)].to_vec())
.map_err(|err| Error::new(ErrorKind::InvalidData, err.to_string()))?;
let header = NpyHeader::from_header(&header)?;

if header.fortran_order {
return Err(Error::new(
ErrorKind::InvalidData,
"fortran order is unsupported",
));
}

if header.shape.len() != 1 {
return Err(Error::new(ErrorKind::InvalidData, "only 1-d is supported"));
}

Ok(header)
}

pub fn read_npy<R: Read, D: NpyDTyped + Clone>(
reader: &mut R,
size: usize,
Expand Down
Empty file.
3 changes: 2 additions & 1 deletion hftbacktest/src/backtest/data/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::HashMap,
io::{Error as IoError, ErrorKind},
rc::Rc,
sync::Arc,
};

use uuid::Uuid;
Expand Down Expand Up @@ -60,7 +61,7 @@ where
/// Provides a data cache that allows both the local processor and exchange processor to access the
/// same or different data based on their timestamps without the need for reloading.
#[derive(Clone, Debug)]
pub struct Cache<D>(Rc<RefCell<HashMap<String, CachedData<D>>>>)
pub struct Cache<D>(Arc<RefCell<HashMap<String, CachedData<D>>>>)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover from the initial prototype, needs reverted.

where
D: POD + Clone;

Expand Down
Loading