Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mode option to ParquetDataCatalog.write_data #1976

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/backtest/databento_option_greeks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Note: Use the python extension jupytext to be able to open this python file in jupyter as a notebook

# %%
# from nautilus_trader.adapters.databento.data_utils import init_databento_client
import nautilus_trader.adapters.databento.data_utils as db_data_utils
from nautilus_trader import PACKAGE_ROOT
from nautilus_trader.adapters.databento.data_utils import data_path
Expand Down Expand Up @@ -175,8 +176,8 @@ def user_log(self, msg):
# %%
# BacktestEngineConfig

load_greeks = False
stream_data = False
# for saving and loading custom data greeks, use False, True then True, False below
load_greeks, stream_data = False, False

actors = [
ImportableActorConfig(
Expand Down Expand Up @@ -273,7 +274,7 @@ def user_log(self, msg):
engine=engine_config,
data=data,
venues=venues,
chunk_size=10_000, # use None when using load_greeks ?
chunk_size=None, # use None when loading custom data
),
]

Expand Down
91 changes: 62 additions & 29 deletions nautilus_trader/adapters/databento/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,25 @@ def databento_cost(symbols, start_time, end_time, schema, dataset="GLBX.MDP3", *
Calculate the cost of retrieving data from the Databento API for the given
parameters.

Args:
symbols (list[str]): The symbols to retrieve data for.
start_time (str): The start time of the data in ISO 8601 format.
end_time (str): The end time of the data in ISO 8601 format.
schema (str): The data schema to retrieve.
dataset (str, optional): The Databento dataset to use, defaults to "GLBX.MDP3".
**kwargs: Additional keyword arguments to pass to the Databento API.

Returns:
float: The estimated cost of retrieving the data.
Parameters
----------
symbols : list of str
The symbols to retrieve data for.
start_time : str
The start time of the data in ISO 8601 format.
end_time : str
The end time of the data in ISO 8601 format.
schema : str
The data schema to retrieve.
dataset : str, optional
The Databento dataset to use, defaults to "GLBX.MDP3".
**kwargs
Additional keyword arguments to pass to the Databento API.

Returns
-------
float
The estimated cost of retrieving the data.

"""
definition_start_date, definition_end_date = databento_definition_dates(start_time)
Expand All @@ -98,29 +107,46 @@ def databento_data(
dataset="GLBX.MDP3",
to_catalog=True,
base_path=None,
write_data_mode="overwrite",
**kwargs,
):
"""
Download and save Databento data and definition files, and optionally save the data
to a catalog.

Args:
symbols (list[str]): The symbols to retrieve data for.
start_time (str): The start time of the data in ISO 8601 format.
end_time (str): The end time of the data in ISO 8601 format.
schema (str): The data schema to retrieve, either "definition" or another valid schema.
file_prefix (str): The prefix to use for the downloaded data files.
*folders (str): Additional folders to create in the data path.
dataset (str, optional): The Databento dataset to use, defaults to "GLBX.MDP3".
to_catalog (bool, optional): Whether to save the data to a catalog, defaults to True.
base_path (str, optional): The base path to use for the data folder, defaults to None.
**kwargs: Additional keyword arguments to pass to the Databento API.

Returns:
dict: A dictionary containing the downloaded data and metadata.

Note:
If schema is equal to 'definition' then no data is downloaded or saved to the catalog.
Parameters
----------
symbols : list of str
The symbols to retrieve data for.
start_time : str
The start time of the data in ISO 8601 format.
end_time : str
The end time of the data in ISO 8601 format.
schema : str
The data schema to retrieve, either "definition" or another valid schema.
file_prefix : str
The prefix to use for the downloaded data files.
*folders : str
Additional folders to create in the data path.
dataset : str, optional
The Databento dataset to use, defaults to "GLBX.MDP3".
to_catalog : bool, optional
Whether to save the data to a catalog, defaults to True.
base_path : str, optional
The base path to use for the data folder, defaults to None.
write_data_mode : str, optional
Whether to "append", "prepend" or "overwrite" data to an existing catalog, defaults to "overwrite".
**kwargs
Additional keyword arguments to pass to the Databento API.

Returns
-------
dict
A dictionary containing the downloaded data and metadata.

Notes
-----
If schema is equal to 'definition' then no data is downloaded or saved to the catalog.

"""
used_path = create_data_folder(*folders, "databento", base_path=base_path)
Expand Down Expand Up @@ -185,21 +211,28 @@ def databento_data(
data_file,
*folders,
base_path=base_path,
write_data_mode=write_data_mode,
)
result.update(catalog_data)

return result


def save_data_to_catalog(definition_file, data_file, *folders, base_path=None):
def save_data_to_catalog(
definition_file,
data_file,
*folders,
base_path=None,
write_data_mode="overwrite",
):
catalog = load_catalog(*folders, base_path=base_path)

loader = DatabentoDataLoader()
nautilus_definition = loader.from_dbn_file(definition_file, as_legacy_cython=True)
nautilus_data = loader.from_dbn_file(data_file, as_legacy_cython=False)

catalog.write_data(nautilus_definition)
catalog.write_data(nautilus_data)
catalog.write_data(nautilus_data, mode=write_data_mode)

return {
"catalog": catalog,
Expand Down
48 changes: 40 additions & 8 deletions nautilus_trader/persistence/catalog/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def write_chunk(
data_cls: type[Data],
instrument_id: str | None = None,
basename_template: str = "part-{i}",
mode: str = "overwrite",
**kwargs: Any,
) -> None:
if isinstance(data[0], CustomData):
Expand All @@ -250,6 +251,7 @@ def write_chunk(
path=path,
fs=self.fs,
basename_template=basename_template,
mode=mode,
)
else:
# Write parquet file
Expand All @@ -261,8 +263,7 @@ def write_chunk(
filesystem=self.fs,
min_rows_per_group=self.min_rows_per_group,
max_rows_per_group=self.max_rows_per_group,
**self.dataset_kwargs,
**kwargs,
**kw,
)

def _fast_write(
Expand All @@ -271,20 +272,43 @@ def _fast_write(
path: str,
fs: fsspec.AbstractFileSystem,
basename_template: str,
mode: str = "overwrite",
) -> None:
name = basename_template.format(i=0)
fs.mkdirs(path, exist_ok=True)
pq.write_table(
table,
where=f"{path}/{name}.parquet",
filesystem=fs,
row_group_size=self.max_rows_per_group,
)
parquet_file = f"{path}/{name}.parquet"

# following solution from https://stackoverflow.com/a/70817689
if mode != "overwrite" and Path(parquet_file).exists():
existing_table = pq.read_table(source=parquet_file, pre_buffer=False, memory_map=True)

with pq.ParquetWriter(
where=parquet_file,
schema=existing_table.schema,
filesystem=fs,
write_batch_size=self.max_rows_per_group,
) as pq_writer:
table = table.cast(existing_table.schema)

if mode == "append":
pq_writer.write_table(existing_table)
pq_writer.write_table(table)
elif mode == "prepend":
pq_writer.write_table(table)
pq_writer.write_table(existing_table)
else:
pq.write_table(
table,
where=parquet_file,
filesystem=fs,
row_group_size=self.max_rows_per_group,
)

def write_data(
self,
data: list[Data | Event] | list[NautilusRustDataType],
basename_template: str = "part-{i}",
mode: str = "overwrite",
**kwargs: Any,
) -> None:
"""
Expand All @@ -303,6 +327,13 @@ def write_data(
The token '{i}' will be replaced with an automatically incremented
integer as files are partitioned.
If not specified, it defaults to 'part-{i}' + the default extension '.parquet'.
mode : str, optional
The mode to use when writing data and when not using using the "partitioning" option.
Can be one of the following:
- "append": Appends the data to the existing data.
- "prepend": Prepends the data to the existing data.
- "overwrite": Overwrites the existing data.
If not specified, it defaults to "overwrite".
kwargs : Any
Additional keyword arguments to be passed to the `write_chunk` method.

Expand Down Expand Up @@ -352,6 +383,7 @@ def obj_to_type(obj: Data) -> type:
data_cls=name_to_cls[cls_name],
instrument_id=instrument_id,
basename_template=basename_template,
mode=mode,
**kwargs,
)

Expand Down
89 changes: 59 additions & 30 deletions nautilus_trader/risk/greeks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@ class GreeksCalculatorConfig(ActorConfig, frozen=True):
"""
Configuration settings for the GreeksCalculator actor.

Attributes:
load_greeks (bool): Flag to determine whether to load pre-calculated Greeks.
underlying (str): The underlying asset symbol.
bar_spec (str): The bar specification for data subscription.
curve_name (str): The name of the interest rate curve.
interest_rate (float): The interest rate used for calculations.
Parameters
----------
load_greeks : bool, default False
Flag to determine whether to load pre-calculated Greeks.
underlying : str, default "ES"
The underlying asset symbol.
bar_spec : str, default "1-MINUTE-LAST"
The bar specification for data subscription.
curve_name : str, default "USD_ShortTerm"
The name of the interest rate curve.
interest_rate : float, default 0.05
The interest rate used for calculations.

"""

Expand All @@ -61,19 +67,34 @@ class GreeksCalculator(Actor):
This calculator works specifically for European options on futures with no dividends.
It computes the Greeks for all options of a given underlying when a bar of the future is received.

Attributes:
load_greeks (bool): Flag to determine whether to load pre-calculated Greeks.
underlying (str): The underlying asset symbol.
bar_spec (str): The bar specification for data subscription.
curve_name (str): The name of the interest rate curve.
interest_rate (InterestRateData or InterestRateCurveData): The interest rate data used for calculations.

Methods:
on_start(): Initializes data subscriptions when the actor starts.
on_data(data): Handles incoming data updates (GreeksData, InterestRateData or InterestRateCurveData).
on_bar(bar: Bar): Processes incoming bar data and triggers Greek calculations.
compute_greeks(instrument_id: InstrumentId, future_price: float, ts_event: int):
Computes Greeks for options based on the future price.
Parameters
----------
config : GreeksCalculatorConfig
The configuration settings for the GreeksCalculator.

Attributes
----------
load_greeks : bool
Flag to determine whether to load pre-calculated Greeks.
underlying : str
The underlying asset symbol.
bar_spec : str
The bar specification for data subscription.
curve_name : str
The name of the interest rate curve.
interest_rate : float
The interest rate used for calculations.

Methods
-------
on_start()
Initializes data subscriptions when the actor starts.
on_data(data)
Handles incoming data updates (GreeksData, InterestRateData or InterestRateCurveData).
on_bar(bar: Bar)
Processes incoming bar data and triggers Greek calculations.
compute_greeks(instrument_id: InstrumentId, future_price: float, ts_event: int)
Computes Greeks for options based on the future price.

"""

Expand Down Expand Up @@ -192,9 +213,12 @@ class InterestRateProviderConfig(ActorConfig, frozen=True):
"""
Configuration for the InterestRateProvider actor.

Attributes:
interest_rates_file (str): Path to the file containing interest rate data.
curve_name (str): Name of the interest rate curve, defaulting to "USD_ShortTerm".
Parameters
----------
interest_rates_file : str
Path to the file containing interest rate data.
curve_name : str, default "USD_ShortTerm"
Name of the interest rate curve. Default is "USD_ShortTerm".

"""

Expand All @@ -210,14 +234,19 @@ class InterestRateProvider(Actor):
updating the current interest rate, and publishing interest rate data
on the message bus.

Attributes:
interest_rates_file (str): Path to the file containing interest rate data.
curve_name (str): Name of the interest rate curve.
interest_rates_df (pandas.DataFrame): DataFrame containing imported interest rate data.

Methods:
on_start(): Initializes the interest rate data on actor start.
update_interest_rate(alert=None): Updates and publishes the current interest rate.
Parameters
----------
interest_rates_file : str
Path to the file containing interest rate data.
curve_name : str
Name of the interest rate curve.

Methods
-------
on_start()
Initializes the interest rate data on actor start.
update_interest_rate(alert=None)
Updates and publishes the current interest rate.

"""

Expand Down
Loading
Loading