Skip to content

Commit

Permalink
update order book updater to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Škoda committed May 2, 2024
1 parent c184e53 commit c81b336
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 38 deletions.
59 changes: 40 additions & 19 deletions lakeapi/orderbook.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING, Optional, Tuple
from numba import float64, njit
from numba.typed import Dict
from numba.typed import Dict, List

if TYPE_CHECKING:
import pandas as pd
Expand All @@ -13,39 +13,60 @@ def __init__(self, df: 'pd.DataFrame'):
self.ask = Dict.empty(key_type = float64, value_type = float64)
self.current_index = 0
self.received_timestamp = None
self.np_arr = df[['bids', 'asks']].to_numpy()
self.received_times = df['received_time'].to_numpy()
self.sequence_number = None
self.np_arr = df[['side_is_bid', 'price', 'size']].values.astype('float64')
self.int_arr = df[['received_time', 'sequence_number']].values.astype('int64')
self._bests_cache = List()
self._bests_cache.append(0.)
self._bests_cache.append(0.)

@staticmethod
@njit(cache = False)
def _update(bids, asks, bid_book, ask_book):
if len(bids):
for price, size in bids:
@njit(cache = True)
def _update_more(side_is_bid, prices, sizes, received_time, sequence_number, current_index, bid_book, ask_book, bests_cache):
starting_received_time = received_time[current_index]
while received_time[current_index] == starting_received_time:
price = prices[current_index]
size = sizes[current_index]
if side_is_bid[current_index]:
if size == 0:
if price in bid_book:
del bid_book[price]
if bests_cache[0] == price:
bests_cache[0] = 0.
else:
bid_book[price] = size
if len(asks) > 0:
for price, size in asks:
if price > bests_cache[0]:
bests_cache[0] = price
else:
if size == 0:
if price in ask_book:
del ask_book[price]
if bests_cache[1] == price:
bests_cache[1] = 0.
else:
ask_book[price] = size
if price < bests_cache[1]:
bests_cache[1] = price
current_index += 1
if current_index >= prices.shape[0]:
break
return current_index, sequence_number[current_index-1], received_time[current_index-1]

def process_next_row(self, row: Optional[int] = None) -> None:
def process_next_update(self, starting_row: Optional[int] = None) -> int:
''' row in df contains received_time, bid and ask columns with numpy list of price-quantity pairs'''
if self.current_index >= self.np_arr.shape[0]:
# return
raise StopIteration
if row is not None:
self.current_index = row
return 0
if starting_row is not None:
self.current_index = starting_row

self._update(*self.np_arr[self.current_index], self.bid, self.ask)
self.received_timestamp = self.received_times[self.current_index]
self.current_index += 1
self.current_index, self.sequence_number, self.received_timestamp = \
self._update_more(*self.np_arr.T, *self.int_arr.T, self.current_index, self.bid, self.ask, self._bests_cache)

return self.current_index

def get_bests(self) -> Tuple[float, float]:
# TODO speed up
return max(self.bid), min(self.ask)
if not self._bests_cache[0]:
self._bests_cache[0] = max(self.bid)
if not self._bests_cache[1]:
self._bests_cache[1] = min(self.ask)
return tuple(self._bests_cache)
83 changes: 64 additions & 19 deletions tests/test_orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,94 @@
@pytest.fixture
def order_book_updater(example_data):
df = pd.DataFrame(example_data)
df['bids'] = df['bids'].apply(lambda x: np.array(x))
df['asks'] = df['asks'].apply(lambda x: np.array(x))
df['side_is_bid'] = df['side_is_bid'].apply(lambda x: np.array(x))
df['price'] = df['price'].apply(lambda x: np.array(x))
df['size'] = df['size'].apply(lambda x: np.array(x))
df['received_time'] = df['received_time'].apply(lambda x: np.array(x))
return OrderBookUpdater(df)

@pytest.fixture
def example_data():
return [
{
'received_time': 1,
'bids': [(1, 10), (2, 20)],
'asks': [(3, 10), (4, 20)]
'sequence_number': 1,
'side_is_bid': True,
'price': 1,
'size': 10,
},
{
'received_time': 1,
'sequence_number': 1,
'side_is_bid': True,
'price': 2,
'size': 20,
},
{
'received_time': 1,
'sequence_number': 1,
'side_is_bid': False,
'price': 3,
'size': 10,
},
{
'received_time': 1,
'sequence_number': 1,
'side_is_bid': False,
'price': 4,
'size': 20,
},
{
'received_time': 2,
'sequence_number': 2,
'side_is_bid': True,
'price': 1,
'size': 5,
},
{
'received_time': 2,
'bids': [(1, 5)],
'asks': [(3, 5)]
'sequence_number': 2,
'side_is_bid': False,
'price': 3,
'size': 5,
},
{
'received_time': 3,
'bids': [(2, 0)],
'asks': [(4, 0)]
}
'sequence_number': 3,
'side_is_bid': True,
'price': 2,
'size': 0,
},
{
'received_time': 3,
'sequence_number': 3,
'side_is_bid': False,
'price': 4,
'size': 0,
},
]

def test_process_next_row(order_book_updater, example_data):
order_book_updater.process_next_row()
assert order_book_updater.bid == dict(example_data[0]['bids'])
assert order_book_updater.ask == dict(example_data[0]['asks'])
def test_process_next_update(order_book_updater, example_data):
order_book_updater.process_next_update()
assert order_book_updater.bid == {1: 10, 2:20}
assert order_book_updater.ask == {3: 10, 4:20}
assert order_book_updater.received_timestamp == example_data[0]['received_time']
assert order_book_updater.sequence_number == example_data[0]['sequence_number']

order_book_updater.process_next_row()
order_book_updater.process_next_update()
assert order_book_updater.bid[1] == 5
assert order_book_updater.bid[2] == 20

order_book_updater.process_next_row()
order_book_updater.process_next_update()
assert order_book_updater.ask == {3: 5}
assert order_book_updater.received_timestamp == example_data[-1]['received_time']
assert order_book_updater.sequence_number == example_data[-1]['sequence_number']


def test_get_bests(order_book_updater):
order_book_updater.process_next_row()
order_book_updater.process_next_update()
assert order_book_updater.get_bests() == (2, 3)

@pytest.mark.benchmark(group='process_next_row')
def test_process_next_row_benchmark(order_book_updater, benchmark):
benchmark.pedantic(order_book_updater.process_next_row, args = (0,), warmup_rounds=100, iterations=1000, rounds=10)
@pytest.mark.benchmark(group='process_next_update')
def test_process_next_update_benchmark(order_book_updater, benchmark):
benchmark.pedantic(order_book_updater.process_next_update, args = (0,), warmup_rounds=100, iterations=1000, rounds=10)

0 comments on commit c81b336

Please sign in to comment.