Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VER: Release 0.42.0 #71

Merged
merged 11 commits into from
Sep 24, 2024
14 changes: 7 additions & 7 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: release

# on:
# workflow_run:
# workflows:
# - test
# branches: [main]
# types:
# - completed
on:
workflow_run:
workflows:
- test
branches: [main]
types:
- completed

jobs:
release:
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## 0.42.0 - 2024-09-23

#### Enhancements
- Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode
- Added `mode` parameter to `DBNStore.to_json` to control the file writing mode
- Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode
- Added `compression` parameter to `DBNStore.to_file` which controls the output compression format
- Added new consolidated publisher values for `XNAS.BASIC` and `DBEQ.MAX`
- Changed `DBNStore` to be more tolerant of truncated DBN streams

#### Breaking changes
- Changed default write mode for `DBNStore.to_csv` to overwrite ("w")
- Changed default write mode for `DBNStore.to_json` to overwrite ("w")
- Changed default write mode for `DBNStore.to_parquet` to overwrite ("w")

## 0.41.0 - 2024-09-03

#### Enhancements
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
[![pypi-version](https://img.shields.io/pypi/v/databento)](https://pypi.org/project/databento)
[![license](https://img.shields.io/github/license/databento/databento-python?color=blue)](./LICENSE)
[![code-style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](http://to.dbn.to/slack)
[![Slack](https://img.shields.io/badge/join_Slack-community-darkblue.svg?logo=slack)](https://to.dbn.to/slack)

The official Python client library for [Databento](https://databento.com).

Expand Down
112 changes: 82 additions & 30 deletions databento/common/dbnstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import decimal
import itertools
import logging
import warnings
from collections.abc import Generator
from collections.abc import Iterator
from collections.abc import Mapping
from io import BufferedReader
from io import BytesIO
from os import PathLike
from pathlib import Path
Expand Down Expand Up @@ -46,6 +48,7 @@
from databento.common.constants import SCHEMA_STRUCT_MAP
from databento.common.constants import SCHEMA_STRUCT_MAP_V1
from databento.common.error import BentoError
from databento.common.error import BentoWarning
from databento.common.symbology import InstrumentMap
from databento.common.types import DBNRecord
from databento.common.types import Default
Expand Down Expand Up @@ -150,7 +153,7 @@ def __init__(self, source: PathLike[str] | str):
)

self._name = self._path.name
self.__buffer: IO[bytes] | None = None
self.__buffer: BufferedReader | None = None

@property
def name(self) -> str:
Expand Down Expand Up @@ -189,13 +192,13 @@ def path(self) -> Path:
return self._path

@property
def reader(self) -> IO[bytes]:
def reader(self) -> BufferedReader:
"""
Return a reader for this file.

Returns
-------
IO
BufferedReader

"""
if self.__buffer is None:
Expand Down Expand Up @@ -259,14 +262,14 @@ def nbytes(self) -> int:
return self.__buffer.getbuffer().nbytes

@property
def reader(self) -> IO[bytes]:
def reader(self) -> BytesIO:
"""
Return a reader for this buffer. The reader beings at the start of the
buffer.

Returns
-------
IO
BytesIO

"""
self.__buffer.seek(0)
Expand Down Expand Up @@ -391,8 +394,8 @@ def __iter__(self) -> Generator[DBNRecord, None, None]:
yield record
else:
if len(decoder.buffer()) > 0:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
warnings.warn(
BentoWarning("DBN file is truncated or contains an incomplete record"),
)
break

Expand Down Expand Up @@ -516,21 +519,18 @@ def reader(self) -> IO[bytes]:

Returns
-------
BinaryIO
IO[bytes]

See Also
--------
DBNStore.raw

"""
if self.compression == Compression.ZSTD:
reader: IO[bytes] = zstandard.ZstdDecompressor().stream_reader(
return zstandard.ZstdDecompressor().stream_reader(
self._data_source.reader,
)
else:
reader = self._data_source.reader

return reader
return self._data_source.reader

@property
def schema(self) -> Schema | None:
Expand Down Expand Up @@ -792,6 +792,7 @@ def to_csv(
map_symbols: bool = True,
compression: Compression | str = Compression.NONE,
schema: Schema | str | None = None,
mode: Literal["w", "x"] = "w",
) -> None:
"""
Write the data to a file in CSV format.
Expand All @@ -816,6 +817,8 @@ def to_csv(
schema : Schema or str, optional
The DBN schema for the csv.
This is only required when reading a DBN stream with mixed record types.
mode : str, default "w"
The file write mode to use, either "x" or "w".

Raises
------
Expand All @@ -825,14 +828,15 @@ def to_csv(
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
if schema is None:
if self.schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
schema = self.schema

with open(path, "xb") as output:
with open(file_path, f"{mode}b") as output:
self._transcode(
output=output,
output=output, # type: ignore [arg-type]
encoding=Encoding.CSV,
pretty_px=pretty_px,
pretty_ts=pretty_ts,
Expand Down Expand Up @@ -961,6 +965,7 @@ def to_parquet(
pretty_ts: bool = True,
map_symbols: bool = True,
schema: Schema | str | None = None,
mode: Literal["w", "x"] = "w",
**kwargs: Any,
) -> None:
"""
Expand All @@ -983,6 +988,8 @@ def to_parquet(
schema : Schema or str, optional
The DBN schema for the parquet file.
This is only required when reading a DBN stream with mixed record types.
mode : str, default "w"
The file write mode to use, either "x" or "w".

Raises
------
Expand All @@ -994,6 +1001,7 @@ def to_parquet(
if price_type == "decimal":
raise ValueError("the 'decimal' price type is not currently supported")

file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
schema = validate_maybe_enum(schema, Schema, "schema")
if schema is None:
if self.schema is None:
Expand All @@ -1015,7 +1023,7 @@ def to_parquet(
# Initialize the writer using the first DataFrame
parquet_schema = pa.Schema.from_pandas(frame)
writer = pq.ParquetWriter(
where=path,
where=file_path,
schema=parquet_schema,
**kwargs,
)
Expand All @@ -1033,6 +1041,7 @@ def to_file(
self,
path: PathLike[str] | str,
mode: Literal["w", "x"] = "w",
compression: Compression | str | None = None,
) -> None:
"""
Write the data to a DBN file at the given path.
Expand All @@ -1043,6 +1052,8 @@ def to_file(
The file path to write to.
mode : str, default "w"
The file write mode to use, either "x" or "w".
compression : Compression or str, optional
The compression format to write. If `None`, uses the same compression as the underlying data.

Raises
------
Expand All @@ -1054,9 +1065,35 @@ def to_file(
If path is not writable.

"""
compression = validate_maybe_enum(compression, Compression, "compression")
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
file_path.write_bytes(self._data_source.reader.read())
self._data_source = FileDataSource(file_path)

writer: IO[bytes] | zstandard.ZstdCompressionWriter
if compression is None or compression == self.compression:
# Handle trivial case
with open(file_path, mode=f"{mode}b") as writer:
reader = self._data_source.reader
while chunk := reader.read(2**16):
writer.write(chunk)
return

if compression == Compression.ZSTD:
writer = zstandard.ZstdCompressor(
write_checksum=True,
).stream_writer(
open(file_path, mode=f"{mode}b"),
closefd=True,
)
else:
writer = open(file_path, mode=f"{mode}b")

try:
reader = self.reader

while chunk := reader.read(2**16):
writer.write(chunk)
finally:
writer.close()

def to_json(
self,
Expand All @@ -1066,6 +1103,7 @@ def to_json(
map_symbols: bool = True,
compression: Compression | str = Compression.NONE,
schema: Schema | str | None = None,
mode: Literal["w", "x"] = "w",
) -> None:
"""
Write the data to a file in JSON format.
Expand All @@ -1089,6 +1127,8 @@ def to_json(
schema : Schema or str, optional
The DBN schema for the json.
This is only required when reading a DBN stream with mixed record types.
mode : str, default "w"
The file write mode to use, either "x" or "w".

Raises
------
Expand All @@ -1098,14 +1138,16 @@ def to_json(
"""
compression = validate_enum(compression, Compression, "compression")
schema = validate_maybe_enum(schema, Schema, "schema")
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")

if schema is None:
if self.schema is None:
raise ValueError("a schema must be specified for mixed DBN data")
schema = self.schema

with open(path, "xb") as output:
with open(file_path, f"{mode}b") as output:
self._transcode(
output=output,
output=output, # type: ignore [arg-type]
encoding=Encoding.JSON,
pretty_px=pretty_px,
pretty_ts=pretty_ts,
Expand Down Expand Up @@ -1239,8 +1281,10 @@ def _transcode(
transcoder.write(byte_chunk)

if transcoder.buffer():
raise BentoError(
"DBN file is truncated or contains an incomplete record",
warnings.warn(
BentoWarning(
"DBN file is truncated or contains an incomplete record",
),
)

transcoder.flush()
Expand Down Expand Up @@ -1285,28 +1329,38 @@ def __init__(
self._dtype = np.dtype(dtype)
self._offset = offset
self._count = count
self._close_on_next = False

self._reader.seek(offset)

def __iter__(self) -> NDArrayStreamIterator:
return self

def __next__(self) -> np.ndarray[Any, Any]:
if self._close_on_next:
raise StopIteration

if self._count is None:
read_size = -1
else:
read_size = self._dtype.itemsize * max(self._count, 1)

if buffer := self._reader.read(read_size):
loose_bytes = len(buffer) % self._dtype.itemsize
if loose_bytes != 0:
warnings.warn(
BentoWarning("DBN file is truncated or contains an incomplete record"),
)
buffer = buffer[:-loose_bytes]
self._close_on_next = True # decode one more buffer before stopping

try:
return np.frombuffer(
buffer=buffer,
dtype=self._dtype,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
)
except ValueError as exc:
raise BentoError("Cannot decode DBN stream") from exc

raise StopIteration

Expand Down Expand Up @@ -1351,10 +1405,8 @@ def __next__(self) -> np.ndarray[Any, Any]:
dtype=self._dtype,
count=num_records,
)
except ValueError:
raise BentoError(
"DBN file is truncated or contains an incomplete record",
) from None
except ValueError as exc:
raise BentoError("Cannot decode DBN stream") from exc


class DataFrameIterator:
Expand Down
Loading
Loading