From 3400e5632d3ae340b393835e072639d8bc08f40e Mon Sep 17 00:00:00 2001 From: simw Date: Tue, 14 Nov 2023 11:35:58 +0000 Subject: [PATCH] Add additional logging to read_from_parquet --- pyproject.toml | 2 +- src/pipedata/__init__.py | 2 +- src/pipedata/ops/files.py | 7 ++++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 474bf2b..4b7b952 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipedata" -version = "0.2.1" +version = "0.2.2" description = "Framework for building pipelines for data processing" authors = ["Simon Wicks "] readme = "README.md" diff --git a/src/pipedata/__init__.py b/src/pipedata/__init__.py index ab4276d..0b8bbe2 100644 --- a/src/pipedata/__init__.py +++ b/src/pipedata/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.2.1" +__version__ = "0.2.2" __all__ = [ "__version__", diff --git a/src/pipedata/ops/files.py b/src/pipedata/ops/files.py index b97b6bb..1cb002b 100644 --- a/src/pipedata/ops/files.py +++ b/src/pipedata/ops/files.py @@ -64,7 +64,12 @@ def parquet_batch_reader( for file_ref in file_refs: logger.info(f"Reading parquet file {file_ref}") ds = pa_dataset.dataset(file_ref, format="parquet") - for batch in ds.to_batches(columns=columns, batch_size=batch_size): + for i, batch in enumerate( + ds.to_batches(columns=columns, batch_size=batch_size) + ): + logger.info( + f"Processing batch {i} (length {len(batch)}) from {file_ref}" + ) if return_as == "recordbatch": yield batch elif return_as == "record":