From 547e84216b95b0ae0ecef5cf025b339fdf30994b Mon Sep 17 00:00:00 2001 From: Faysal Aberkane Date: Wed, 2 Oct 2024 13:06:13 +0200 Subject: [PATCH] Add mode option to ParquetDataCatalog.write_data --- examples/backtest/databento_option_greeks.py | 7 +- .../adapters/databento/data_utils.py | 91 +++++++++++++------ .../persistence/catalog/parquet.py | 48 ++++++++-- nautilus_trader/risk/greeks.py | 89 ++++++++++++------ tests/unit_tests/persistence/test_catalog.py | 20 ++++ 5 files changed, 185 insertions(+), 70 deletions(-) diff --git a/examples/backtest/databento_option_greeks.py b/examples/backtest/databento_option_greeks.py index 300c34683c6..d21e8256de9 100644 --- a/examples/backtest/databento_option_greeks.py +++ b/examples/backtest/databento_option_greeks.py @@ -5,6 +5,7 @@ # Note: Use the python extension jupytext to be able to open this python file in jupyter as a notebook # %% +# from nautilus_trader.adapters.databento.data_utils import init_databento_client import nautilus_trader.adapters.databento.data_utils as db_data_utils from nautilus_trader import PACKAGE_ROOT from nautilus_trader.adapters.databento.data_utils import data_path @@ -175,8 +176,8 @@ def user_log(self, msg): # %% # BacktestEngineConfig -load_greeks = False -stream_data = False +# for saving and loading custom data greeks, use False, True then True, False below +load_greeks, stream_data = False, False actors = [ ImportableActorConfig( @@ -273,7 +274,7 @@ def user_log(self, msg): engine=engine_config, data=data, venues=venues, - chunk_size=10_000, # use None when using load_greeks ? + chunk_size=None, # use None when loading custom data ), ] diff --git a/nautilus_trader/adapters/databento/data_utils.py b/nautilus_trader/adapters/databento/data_utils.py index ce7133bb996..ab2bb951029 100644 --- a/nautilus_trader/adapters/databento/data_utils.py +++ b/nautilus_trader/adapters/databento/data_utils.py @@ -64,16 +64,25 @@ def databento_cost(symbols, start_time, end_time, schema, dataset="GLBX.MDP3", * Calculate the cost of retrieving data from the Databento API for the given parameters. - Args: - symbols (list[str]): The symbols to retrieve data for. - start_time (str): The start time of the data in ISO 8601 format. - end_time (str): The end time of the data in ISO 8601 format. - schema (str): The data schema to retrieve. - dataset (str, optional): The Databento dataset to use, defaults to "GLBX.MDP3". - **kwargs: Additional keyword arguments to pass to the Databento API. - - Returns: - float: The estimated cost of retrieving the data. + Parameters + ---------- + symbols : list of str + The symbols to retrieve data for. + start_time : str + The start time of the data in ISO 8601 format. + end_time : str + The end time of the data in ISO 8601 format. + schema : str + The data schema to retrieve. + dataset : str, optional + The Databento dataset to use, defaults to "GLBX.MDP3". + **kwargs + Additional keyword arguments to pass to the Databento API. + + Returns + ------- + float + The estimated cost of retrieving the data. """ definition_start_date, definition_end_date = databento_definition_dates(start_time) @@ -98,29 +107,46 @@ def databento_data( dataset="GLBX.MDP3", to_catalog=True, base_path=None, + write_data_mode="overwrite", **kwargs, ): """ Download and save Databento data and definition files, and optionally save the data to a catalog. - Args: - symbols (list[str]): The symbols to retrieve data for. - start_time (str): The start time of the data in ISO 8601 format. - end_time (str): The end time of the data in ISO 8601 format. - schema (str): The data schema to retrieve, either "definition" or another valid schema. - file_prefix (str): The prefix to use for the downloaded data files. - *folders (str): Additional folders to create in the data path. - dataset (str, optional): The Databento dataset to use, defaults to "GLBX.MDP3". - to_catalog (bool, optional): Whether to save the data to a catalog, defaults to True. - base_path (str, optional): The base path to use for the data folder, defaults to None. - **kwargs: Additional keyword arguments to pass to the Databento API. - - Returns: - dict: A dictionary containing the downloaded data and metadata. - - Note: - If schema is equal to 'definition' then no data is downloaded or saved to the catalog. + Parameters + ---------- + symbols : list of str + The symbols to retrieve data for. + start_time : str + The start time of the data in ISO 8601 format. + end_time : str + The end time of the data in ISO 8601 format. + schema : str + The data schema to retrieve, either "definition" or another valid schema. + file_prefix : str + The prefix to use for the downloaded data files. + *folders : str + Additional folders to create in the data path. + dataset : str, optional + The Databento dataset to use, defaults to "GLBX.MDP3". + to_catalog : bool, optional + Whether to save the data to a catalog, defaults to True. + base_path : str, optional + The base path to use for the data folder, defaults to None. + write_data_mode : str, optional + Whether to "append", "prepend" or "overwrite" data to an existing catalog, defaults to "overwrite". + **kwargs + Additional keyword arguments to pass to the Databento API. + + Returns + ------- + dict + A dictionary containing the downloaded data and metadata. + + Notes + ----- + If schema is equal to 'definition' then no data is downloaded or saved to the catalog. """ used_path = create_data_folder(*folders, "databento", base_path=base_path) @@ -185,13 +211,20 @@ def databento_data( data_file, *folders, base_path=base_path, + write_data_mode=write_data_mode, ) result.update(catalog_data) return result -def save_data_to_catalog(definition_file, data_file, *folders, base_path=None): +def save_data_to_catalog( + definition_file, + data_file, + *folders, + base_path=None, + write_data_mode="overwrite", +): catalog = load_catalog(*folders, base_path=base_path) loader = DatabentoDataLoader() @@ -199,7 +232,7 @@ def save_data_to_catalog(definition_file, data_file, *folders, base_path=None): nautilus_data = loader.from_dbn_file(data_file, as_legacy_cython=False) catalog.write_data(nautilus_definition) - catalog.write_data(nautilus_data) + catalog.write_data(nautilus_data, mode=write_data_mode) return { "catalog": catalog, diff --git a/nautilus_trader/persistence/catalog/parquet.py b/nautilus_trader/persistence/catalog/parquet.py index 60d4fd92a61..1937eb2311f 100644 --- a/nautilus_trader/persistence/catalog/parquet.py +++ b/nautilus_trader/persistence/catalog/parquet.py @@ -236,6 +236,7 @@ def write_chunk( data_cls: type[Data], instrument_id: str | None = None, basename_template: str = "part-{i}", + mode: str = "overwrite", **kwargs: Any, ) -> None: if isinstance(data[0], CustomData): @@ -250,6 +251,7 @@ def write_chunk( path=path, fs=self.fs, basename_template=basename_template, + mode=mode, ) else: # Write parquet file @@ -261,8 +263,7 @@ def write_chunk( filesystem=self.fs, min_rows_per_group=self.min_rows_per_group, max_rows_per_group=self.max_rows_per_group, - **self.dataset_kwargs, - **kwargs, + **kw, ) def _fast_write( @@ -271,20 +272,43 @@ def _fast_write( path: str, fs: fsspec.AbstractFileSystem, basename_template: str, + mode: str = "overwrite", ) -> None: name = basename_template.format(i=0) fs.mkdirs(path, exist_ok=True) - pq.write_table( - table, - where=f"{path}/{name}.parquet", - filesystem=fs, - row_group_size=self.max_rows_per_group, - ) + parquet_file = f"{path}/{name}.parquet" + + # following solution from https://stackoverflow.com/a/70817689 + if mode != "overwrite" and Path(parquet_file).exists(): + existing_table = pq.read_table(source=parquet_file, pre_buffer=False, memory_map=True) + + with pq.ParquetWriter( + where=parquet_file, + schema=existing_table.schema, + filesystem=fs, + write_batch_size=self.max_rows_per_group, + ) as pq_writer: + table = table.cast(existing_table.schema) + + if mode == "append": + pq_writer.write_table(existing_table) + pq_writer.write_table(table) + elif mode == "prepend": + pq_writer.write_table(table) + pq_writer.write_table(existing_table) + else: + pq.write_table( + table, + where=parquet_file, + filesystem=fs, + row_group_size=self.max_rows_per_group, + ) def write_data( self, data: list[Data | Event] | list[NautilusRustDataType], basename_template: str = "part-{i}", + mode: str = "overwrite", **kwargs: Any, ) -> None: """ @@ -303,6 +327,13 @@ def write_data( The token '{i}' will be replaced with an automatically incremented integer as files are partitioned. If not specified, it defaults to 'part-{i}' + the default extension '.parquet'. + mode : str, optional + The mode to use when writing data and when not using using the "partitioning" option. + Can be one of the following: + - "append": Appends the data to the existing data. + - "prepend": Prepends the data to the existing data. + - "overwrite": Overwrites the existing data. + If not specified, it defaults to "overwrite". kwargs : Any Additional keyword arguments to be passed to the `write_chunk` method. @@ -352,6 +383,7 @@ def obj_to_type(obj: Data) -> type: data_cls=name_to_cls[cls_name], instrument_id=instrument_id, basename_template=basename_template, + mode=mode, **kwargs, ) diff --git a/nautilus_trader/risk/greeks.py b/nautilus_trader/risk/greeks.py index b5f3a496a36..fd023dd2393 100644 --- a/nautilus_trader/risk/greeks.py +++ b/nautilus_trader/risk/greeks.py @@ -38,12 +38,18 @@ class GreeksCalculatorConfig(ActorConfig, frozen=True): """ Configuration settings for the GreeksCalculator actor. - Attributes: - load_greeks (bool): Flag to determine whether to load pre-calculated Greeks. - underlying (str): The underlying asset symbol. - bar_spec (str): The bar specification for data subscription. - curve_name (str): The name of the interest rate curve. - interest_rate (float): The interest rate used for calculations. + Parameters + ---------- + load_greeks : bool, default False + Flag to determine whether to load pre-calculated Greeks. + underlying : str, default "ES" + The underlying asset symbol. + bar_spec : str, default "1-MINUTE-LAST" + The bar specification for data subscription. + curve_name : str, default "USD_ShortTerm" + The name of the interest rate curve. + interest_rate : float, default 0.05 + The interest rate used for calculations. """ @@ -61,19 +67,34 @@ class GreeksCalculator(Actor): This calculator works specifically for European options on futures with no dividends. It computes the Greeks for all options of a given underlying when a bar of the future is received. - Attributes: - load_greeks (bool): Flag to determine whether to load pre-calculated Greeks. - underlying (str): The underlying asset symbol. - bar_spec (str): The bar specification for data subscription. - curve_name (str): The name of the interest rate curve. - interest_rate (InterestRateData or InterestRateCurveData): The interest rate data used for calculations. - - Methods: - on_start(): Initializes data subscriptions when the actor starts. - on_data(data): Handles incoming data updates (GreeksData, InterestRateData or InterestRateCurveData). - on_bar(bar: Bar): Processes incoming bar data and triggers Greek calculations. - compute_greeks(instrument_id: InstrumentId, future_price: float, ts_event: int): - Computes Greeks for options based on the future price. + Parameters + ---------- + config : GreeksCalculatorConfig + The configuration settings for the GreeksCalculator. + + Attributes + ---------- + load_greeks : bool + Flag to determine whether to load pre-calculated Greeks. + underlying : str + The underlying asset symbol. + bar_spec : str + The bar specification for data subscription. + curve_name : str + The name of the interest rate curve. + interest_rate : float + The interest rate used for calculations. + + Methods + ------- + on_start() + Initializes data subscriptions when the actor starts. + on_data(data) + Handles incoming data updates (GreeksData, InterestRateData or InterestRateCurveData). + on_bar(bar: Bar) + Processes incoming bar data and triggers Greek calculations. + compute_greeks(instrument_id: InstrumentId, future_price: float, ts_event: int) + Computes Greeks for options based on the future price. """ @@ -192,9 +213,12 @@ class InterestRateProviderConfig(ActorConfig, frozen=True): """ Configuration for the InterestRateProvider actor. - Attributes: - interest_rates_file (str): Path to the file containing interest rate data. - curve_name (str): Name of the interest rate curve, defaulting to "USD_ShortTerm". + Parameters + ---------- + interest_rates_file : str + Path to the file containing interest rate data. + curve_name : str, default "USD_ShortTerm" + Name of the interest rate curve. Default is "USD_ShortTerm". """ @@ -210,14 +234,19 @@ class InterestRateProvider(Actor): updating the current interest rate, and publishing interest rate data on the message bus. - Attributes: - interest_rates_file (str): Path to the file containing interest rate data. - curve_name (str): Name of the interest rate curve. - interest_rates_df (pandas.DataFrame): DataFrame containing imported interest rate data. - - Methods: - on_start(): Initializes the interest rate data on actor start. - update_interest_rate(alert=None): Updates and publishes the current interest rate. + Parameters + ---------- + interest_rates_file : str + Path to the file containing interest rate data. + curve_name : str + Name of the interest rate curve. + + Methods + ------- + on_start() + Initializes the interest rate data on actor start. + update_interest_rate(alert=None) + Updates and publishes the current interest rate. """ diff --git a/tests/unit_tests/persistence/test_catalog.py b/tests/unit_tests/persistence/test_catalog.py index b16136f864a..a762393ad7c 100644 --- a/tests/unit_tests/persistence/test_catalog.py +++ b/tests/unit_tests/persistence/test_catalog.py @@ -266,6 +266,26 @@ def test_catalog_bars_querying_by_bar_type(catalog: ParquetDataCatalog) -> None: assert len(bars) == len(stub_bars) == 10 +def test_catalog_append_data(catalog: ParquetDataCatalog) -> None: + # Arrange + bar_type = TestDataStubs.bartype_adabtc_binance_1min_last() + instrument = TestInstrumentProvider.adabtc_binance() + stub_bars = TestDataStubs.binance_bars_from_csv( + "ADABTC-1m-2021-11-27.csv", + bar_type, + instrument, + ) + catalog.write_data(stub_bars) + + # Act + catalog.write_data(stub_bars, mode="append") + + # Assert + bars = catalog.bars(bar_types=[str(bar_type)]) + all_bars = catalog.bars() + assert len(bars) == len(all_bars) == 20 + + def test_catalog_bars_querying_by_instrument_id(catalog: ParquetDataCatalog) -> None: # Arrange bar_type = TestDataStubs.bartype_adabtc_binance_1min_last()