From 3c9980bd5c129e212b4a8c649abd21f1d12c1c2a Mon Sep 17 00:00:00 2001 From: Simon Date: Mon, 13 Nov 2023 09:55:08 +0000 Subject: [PATCH] Update zipped_files to return name as well as contents; update json_records and csv_records to take new structure; bump to 0.2 (#9) * Add more logging; add py.typed file to mark library as typed * Change zipped_files to return ZippedfileRef instead of just open file, update to version 0.2 * Update json_records and csv_records to either take IO or OpenedFileRef --- README.md | 7 +++++++ pyproject.toml | 2 +- src/pipedata/__init__.py | 2 +- src/pipedata/ops/files.py | 22 +++++++++++++++++--- src/pipedata/ops/records.py | 40 +++++++++++++++++++++++++++---------- src/pipedata/ops/storage.py | 24 ++++++++++++---------- src/pipedata/py.typed | 0 tests/ops/test_files.py | 18 ++++++++++++++++- tests/ops/test_pipeline.py | 1 + tests/ops/test_records.py | 40 +++++++++++++++++++++++++++++++++++++ 10 files changed, 130 insertions(+), 26 deletions(-) create mode 100644 src/pipedata/py.typed diff --git a/README.md b/README.md index 22f9334..3181218 100644 --- a/README.md +++ b/README.md @@ -140,3 +140,10 @@ print(json.dumps(chain.get_counts(), indent=4)) print(StreamStart(range(10)).flat_map(chain).to_list()) #> [2, 10, 10] ``` + +## Similar Functionality + +- Python has built in functionality for building iterators + +- [LangChain](https://www.langchain.com/) implements chained operations using its +[Runnable protocol](https://python.langchain.com/docs/expression_language/interface) diff --git a/pyproject.toml b/pyproject.toml index 3beec90..b2142e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipedata" -version = "0.1.1" +version = "0.2" description = "Framework for building pipelines for data processing" authors = ["Simon Wicks "] readme = "README.md" diff --git a/src/pipedata/__init__.py b/src/pipedata/__init__.py index 21c2c19..fe00a28 100644 --- a/src/pipedata/__init__.py +++ b/src/pipedata/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.1" +__version__ = "0.2" __all__ = [ "__version__", diff --git a/src/pipedata/ops/files.py b/src/pipedata/ops/files.py index c7af7df..af4897a 100644 --- a/src/pipedata/ops/files.py +++ b/src/pipedata/ops/files.py @@ -1,5 +1,6 @@ import logging import zipfile +from dataclasses import dataclass from typing import IO, Iterator import fsspec # type: ignore @@ -7,10 +8,25 @@ logger = logging.getLogger(__name__) -def zipped_files(file_refs: Iterator[str]) -> Iterator[IO[bytes]]: +@dataclass +class OpenedFileRef: + name: str + contents: IO[bytes] + + +def zipped_files(file_refs: Iterator[str]) -> Iterator[OpenedFileRef]: + logger.info("Initializing zipped files reader") for file_ref in file_refs: + logger.info(f"Opening zip file at {file_ref}") with fsspec.open(file_ref, "rb") as file: with zipfile.ZipFile(file) as zip_file: - for name in zip_file.namelist(): + infos = zip_file.infolist() + logger.info(f"Found {len(infos)} files in zip file") + for i, info in enumerate(infos): + name = info.filename + logger.info(f"Reading file {i} ({name}) from zip file") with zip_file.open(name) as inner_file: - yield inner_file + yield OpenedFileRef( + name=name, + contents=inner_file, + ) diff --git a/src/pipedata/ops/records.py b/src/pipedata/ops/records.py index d862221..bf13024 100644 --- a/src/pipedata/ops/records.py +++ b/src/pipedata/ops/records.py @@ -1,33 +1,53 @@ import csv import io import logging -from typing import IO, Any, Callable, Dict, Iterator, Optional +from typing import IO, Any, Callable, Dict, Iterator, Optional, Union import ijson # type: ignore +from .files import OpenedFileRef + logger = logging.getLogger(__name__) def json_records( json_path: str = "item", multiple_values: Optional[bool] = False -) -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]: +) -> Callable[[Iterator[Union[IO[bytes], OpenedFileRef]]], Iterator[Dict[str, Any]]]: logger.info(f"Initializing json reader for {json_path}") - def json_records_func(json_files: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]: + def json_records_func( + json_files: Iterator[Union[IO[bytes], OpenedFileRef]] + ) -> Iterator[Dict[str, Any]]: for json_file in json_files: - logger.info(f"Reading json file {json_file}") - records = ijson.items(json_file, json_path, multiple_values=multiple_values) + if isinstance(json_file, OpenedFileRef): + contents = json_file.contents + logger.info(f"Reading json file {json_file.name}") + else: + contents = json_file + logger.info(f"Reading json file {json_file}") + records = ijson.items(contents, json_path, multiple_values=multiple_values) yield from records return json_records_func -def csv_records() -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]: - def csv_records_func(csv_paths: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]: - for csv_path in csv_paths: - logger.info(f"Reading csv file {csv_path}") +def csv_records() -> ( + Callable[[Iterator[Union[IO[bytes], OpenedFileRef]]], Iterator[Dict[str, Any]]] +): + logger.info("Initializing csv reader") + + def csv_records_func( + csv_files: Iterator[Union[IO[bytes], OpenedFileRef]] + ) -> Iterator[Dict[str, Any]]: + for csv_file in csv_files: + if isinstance(csv_file, OpenedFileRef): + contents = csv_file.contents + logger.info(f"Reading csv file {csv_file.name}") + else: + contents = csv_file + logger.info(f"Reading csv file {csv_file}") csv_reader = csv.DictReader( - io.TextIOWrapper(csv_path, "utf-8"), delimiter="," + io.TextIOWrapper(contents, "utf-8"), delimiter="," ) yield from csv_reader diff --git a/src/pipedata/ops/storage.py b/src/pipedata/ops/storage.py index 581814f..cb72304 100644 --- a/src/pipedata/ops/storage.py +++ b/src/pipedata/ops/storage.py @@ -1,3 +1,4 @@ +import logging from typing import Any, Callable, Dict, Iterator, Optional import pyarrow as pa # type: ignore @@ -5,14 +6,7 @@ from pipedata.core.chain import batched -# Option to accumulate the pyarrow table more frequently -# so that doesn't need whole list(dict) and pyarrow table -# in memory at the same time - -# Option to hae row_group_length and max_file_length dpendent -# on size of data, as opposed to number of just numbers of rows. -# Can combine this with the existing settings, so runs -# at the smaller of the two. +logger = logging.getLogger(__name__) def parquet_writer( @@ -24,13 +18,16 @@ def parquet_writer( if row_group_length is None and max_file_length is not None: row_group_length = max_file_length - if max_file_length is not None: + multi_file = max_file_length is not None + if multi_file: if file_path.format(i=1) == file_path: msg = "When (possibly) writing to multiple files (as the file_length" msg += " argument is not None), the file_path argument must be a" msg += " format string that contains a format specifier for the file." raise ValueError(msg) + logger.info(f"Initializing parquet writer with {file_path=}") + def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]: writer = None file_number = 1 @@ -39,22 +36,29 @@ def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]: table = pa.Table.from_pylist(batch, schema=schema) if writer is None: formated_file_path = file_path - if max_file_length is not None: + if multi_file: formated_file_path = file_path.format(i=file_number) + logger.info(f"Writing to {formated_file_path=}") writer = pq.ParquetWriter(formated_file_path, table.schema) writer.write_table(table) file_length += len(batch) + logger.info( + f"Written {len(batch)} ({file_length} total) rows " + f"to {formated_file_path}" + ) if max_file_length is not None and file_length >= max_file_length: writer.close() writer = None file_length = 0 file_number += 1 + logger.info(f"Finished writing to {formated_file_path}") yield formated_file_path if writer is not None: writer.close() + logger.info(f"Final file closed at {formated_file_path}") yield formated_file_path return parquet_writer_func diff --git a/src/pipedata/py.typed b/src/pipedata/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/tests/ops/test_files.py b/tests/ops/test_files.py index c551f9b..c3c3709 100644 --- a/tests/ops/test_files.py +++ b/tests/ops/test_files.py @@ -7,6 +7,22 @@ def test_zipped_files() -> None: + with tempfile.TemporaryDirectory() as temp_dir: + zip_path = Path(temp_dir) / "test.zip" + + with zipfile.ZipFile(zip_path, "w") as zip_file: + zip_file.writestr("test.txt", "Hello, world 1!") + zip_file.writestr("test2.txt", "Hello, world 2!") + zip_file.writestr("test3.txt", "Hello, world 3!") + + result = StreamStart([str(zip_path)]).flat_map(zipped_files).to_list() + + assert result[0].name == "test.txt" + assert result[1].name == "test2.txt" + assert result[2].name == "test3.txt" + + +def test_zipped_file_contents() -> None: with tempfile.TemporaryDirectory() as temp_dir: zip_path = Path(temp_dir) / "test.zip" @@ -18,7 +34,7 @@ def test_zipped_files() -> None: result = ( StreamStart([str(zip_path)]) .flat_map(zipped_files) - .map(lambda x: x.read().decode("utf-8")) + .map(lambda x: x.contents.read().decode("utf-8")) .to_list() ) diff --git a/tests/ops/test_pipeline.py b/tests/ops/test_pipeline.py index db8b94d..031e607 100644 --- a/tests/ops/test_pipeline.py +++ b/tests/ops/test_pipeline.py @@ -35,6 +35,7 @@ def test_zipped_files() -> None: result = ( StreamStart([str(zip_path)]) .flat_map(zipped_files) + .map(lambda x: x.contents) .flat_map(json_records()) .flat_map(parquet_writer(str(output_path))) .to_list() diff --git a/tests/ops/test_records.py b/tests/ops/test_records.py index 65bed15..ee392ca 100644 --- a/tests/ops/test_records.py +++ b/tests/ops/test_records.py @@ -2,6 +2,7 @@ import json from pipedata.core import StreamStart +from pipedata.ops.files import OpenedFileRef from pipedata.ops.records import csv_records, json_records @@ -17,6 +18,23 @@ def test_json_records() -> None: assert result == expected +def test_json_records_from_file_ref() -> None: + json1 = [{"a": 1, "b": 2}, {"a": 3, "b": 4}] + json2 = [{"a": 5, "b": 6}, {"a": 7, "b": 8}] + + file1 = io.BytesIO(json.dumps(json1).encode("utf-8")) + file2 = io.BytesIO(json.dumps(json2).encode("utf-8")) + + file_refs = [ + OpenedFileRef(name="test1.json", contents=file1), + OpenedFileRef(name="test2.json", contents=file2), + ] + + result = StreamStart(file_refs).flat_map(json_records()).to_list() + expected = json1 + json2 + assert result == expected + + def test_csv_records() -> None: csv1 = "a,b\n1,2\n3,4" csv2 = "a,b\n5,6\n7,8" @@ -32,3 +50,25 @@ def test_csv_records() -> None: {"a": "7", "b": "8"}, ] assert result == expected + + +def test_csv_records_from_file_ref() -> None: + csv1 = "a,b\n1,2\n3,4" + csv2 = "a,b\n5,6\n7,8" + + file1 = io.BytesIO(csv1.encode("utf-8")) + file2 = io.BytesIO(csv2.encode("utf-8")) + + file_refs = [ + OpenedFileRef(name="test1.csv", contents=file1), + OpenedFileRef(name="test2.csv", contents=file2), + ] + + result = StreamStart(file_refs).flat_map(csv_records()).to_list() + expected = [ + {"a": "1", "b": "2"}, + {"a": "3", "b": "4"}, + {"a": "5", "b": "6"}, + {"a": "7", "b": "8"}, + ] + assert result == expected