Skip to content

Commit

Permalink
Update zipped_files to return name as well as contents; update json_r…
Browse files Browse the repository at this point in the history
…ecords and csv_records to take new structure; bump to 0.2 (#9)

* Add more logging; add py.typed file to mark library as typed

* Change zipped_files to return ZippedfileRef instead of just open file, update to version 0.2

* Update json_records and csv_records to either take IO or OpenedFileRef
  • Loading branch information
simw authored Nov 13, 2023
1 parent 146285e commit 3c9980b
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 26 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,10 @@ print(json.dumps(chain.get_counts(), indent=4))
print(StreamStart(range(10)).flat_map(chain).to_list())
#> [2, 10, 10]
```

## Similar Functionality

- Python has built in functionality for building iterators

- [LangChain](https://www.langchain.com/) implements chained operations using its
[Runnable protocol](https://python.langchain.com/docs/expression_language/interface)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipedata"
version = "0.1.1"
version = "0.2"
description = "Framework for building pipelines for data processing"
authors = ["Simon Wicks <[email protected]>"]
readme = "README.md"
Expand Down
2 changes: 1 addition & 1 deletion src/pipedata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.1.1"
__version__ = "0.2"

__all__ = [
"__version__",
Expand Down
22 changes: 19 additions & 3 deletions src/pipedata/ops/files.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
import logging
import zipfile
from dataclasses import dataclass
from typing import IO, Iterator

import fsspec # type: ignore

logger = logging.getLogger(__name__)


def zipped_files(file_refs: Iterator[str]) -> Iterator[IO[bytes]]:
@dataclass
class OpenedFileRef:
name: str
contents: IO[bytes]


def zipped_files(file_refs: Iterator[str]) -> Iterator[OpenedFileRef]:
logger.info("Initializing zipped files reader")
for file_ref in file_refs:
logger.info(f"Opening zip file at {file_ref}")
with fsspec.open(file_ref, "rb") as file:
with zipfile.ZipFile(file) as zip_file:
for name in zip_file.namelist():
infos = zip_file.infolist()
logger.info(f"Found {len(infos)} files in zip file")
for i, info in enumerate(infos):
name = info.filename
logger.info(f"Reading file {i} ({name}) from zip file")
with zip_file.open(name) as inner_file:
yield inner_file
yield OpenedFileRef(
name=name,
contents=inner_file,
)
40 changes: 30 additions & 10 deletions src/pipedata/ops/records.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,53 @@
import csv
import io
import logging
from typing import IO, Any, Callable, Dict, Iterator, Optional
from typing import IO, Any, Callable, Dict, Iterator, Optional, Union

import ijson # type: ignore

from .files import OpenedFileRef

logger = logging.getLogger(__name__)


def json_records(
json_path: str = "item", multiple_values: Optional[bool] = False
) -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]:
) -> Callable[[Iterator[Union[IO[bytes], OpenedFileRef]]], Iterator[Dict[str, Any]]]:
logger.info(f"Initializing json reader for {json_path}")

def json_records_func(json_files: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]:
def json_records_func(
json_files: Iterator[Union[IO[bytes], OpenedFileRef]]
) -> Iterator[Dict[str, Any]]:
for json_file in json_files:
logger.info(f"Reading json file {json_file}")
records = ijson.items(json_file, json_path, multiple_values=multiple_values)
if isinstance(json_file, OpenedFileRef):
contents = json_file.contents
logger.info(f"Reading json file {json_file.name}")
else:
contents = json_file
logger.info(f"Reading json file {json_file}")
records = ijson.items(contents, json_path, multiple_values=multiple_values)
yield from records

return json_records_func


def csv_records() -> Callable[[Iterator[IO[bytes]]], Iterator[Dict[str, Any]]]:
def csv_records_func(csv_paths: Iterator[IO[bytes]]) -> Iterator[Dict[str, Any]]:
for csv_path in csv_paths:
logger.info(f"Reading csv file {csv_path}")
def csv_records() -> (
Callable[[Iterator[Union[IO[bytes], OpenedFileRef]]], Iterator[Dict[str, Any]]]
):
logger.info("Initializing csv reader")

def csv_records_func(
csv_files: Iterator[Union[IO[bytes], OpenedFileRef]]
) -> Iterator[Dict[str, Any]]:
for csv_file in csv_files:
if isinstance(csv_file, OpenedFileRef):
contents = csv_file.contents
logger.info(f"Reading csv file {csv_file.name}")
else:
contents = csv_file
logger.info(f"Reading csv file {csv_file}")
csv_reader = csv.DictReader(
io.TextIOWrapper(csv_path, "utf-8"), delimiter=","
io.TextIOWrapper(contents, "utf-8"), delimiter=","
)
yield from csv_reader

Expand Down
24 changes: 14 additions & 10 deletions src/pipedata/ops/storage.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import logging
from typing import Any, Callable, Dict, Iterator, Optional

import pyarrow as pa # type: ignore
import pyarrow.parquet as pq # type: ignore

from pipedata.core.chain import batched

# Option to accumulate the pyarrow table more frequently
# so that doesn't need whole list(dict) and pyarrow table
# in memory at the same time

# Option to hae row_group_length and max_file_length dpendent
# on size of data, as opposed to number of just numbers of rows.
# Can combine this with the existing settings, so runs
# at the smaller of the two.
logger = logging.getLogger(__name__)


def parquet_writer(
Expand All @@ -24,13 +18,16 @@ def parquet_writer(
if row_group_length is None and max_file_length is not None:
row_group_length = max_file_length

if max_file_length is not None:
multi_file = max_file_length is not None
if multi_file:
if file_path.format(i=1) == file_path:
msg = "When (possibly) writing to multiple files (as the file_length"
msg += " argument is not None), the file_path argument must be a"
msg += " format string that contains a format specifier for the file."
raise ValueError(msg)

logger.info(f"Initializing parquet writer with {file_path=}")

def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]:
writer = None
file_number = 1
Expand All @@ -39,22 +36,29 @@ def parquet_writer_func(records: Iterator[Dict[str, Any]]) -> Iterator[str]:
table = pa.Table.from_pylist(batch, schema=schema)
if writer is None:
formated_file_path = file_path
if max_file_length is not None:
if multi_file:
formated_file_path = file_path.format(i=file_number)
logger.info(f"Writing to {formated_file_path=}")
writer = pq.ParquetWriter(formated_file_path, table.schema)

writer.write_table(table)
file_length += len(batch)
logger.info(
f"Written {len(batch)} ({file_length} total) rows "
f"to {formated_file_path}"
)

if max_file_length is not None and file_length >= max_file_length:
writer.close()
writer = None
file_length = 0
file_number += 1
logger.info(f"Finished writing to {formated_file_path}")
yield formated_file_path

if writer is not None:
writer.close()
logger.info(f"Final file closed at {formated_file_path}")
yield formated_file_path

return parquet_writer_func
Empty file added src/pipedata/py.typed
Empty file.
18 changes: 17 additions & 1 deletion tests/ops/test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@


def test_zipped_files() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
zip_path = Path(temp_dir) / "test.zip"

with zipfile.ZipFile(zip_path, "w") as zip_file:
zip_file.writestr("test.txt", "Hello, world 1!")
zip_file.writestr("test2.txt", "Hello, world 2!")
zip_file.writestr("test3.txt", "Hello, world 3!")

result = StreamStart([str(zip_path)]).flat_map(zipped_files).to_list()

assert result[0].name == "test.txt"
assert result[1].name == "test2.txt"
assert result[2].name == "test3.txt"


def test_zipped_file_contents() -> None:
with tempfile.TemporaryDirectory() as temp_dir:
zip_path = Path(temp_dir) / "test.zip"

Expand All @@ -18,7 +34,7 @@ def test_zipped_files() -> None:
result = (
StreamStart([str(zip_path)])
.flat_map(zipped_files)
.map(lambda x: x.read().decode("utf-8"))
.map(lambda x: x.contents.read().decode("utf-8"))
.to_list()
)

Expand Down
1 change: 1 addition & 0 deletions tests/ops/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_zipped_files() -> None:
result = (
StreamStart([str(zip_path)])
.flat_map(zipped_files)
.map(lambda x: x.contents)
.flat_map(json_records())
.flat_map(parquet_writer(str(output_path)))
.to_list()
Expand Down
40 changes: 40 additions & 0 deletions tests/ops/test_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json

from pipedata.core import StreamStart
from pipedata.ops.files import OpenedFileRef
from pipedata.ops.records import csv_records, json_records


Expand All @@ -17,6 +18,23 @@ def test_json_records() -> None:
assert result == expected


def test_json_records_from_file_ref() -> None:
json1 = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
json2 = [{"a": 5, "b": 6}, {"a": 7, "b": 8}]

file1 = io.BytesIO(json.dumps(json1).encode("utf-8"))
file2 = io.BytesIO(json.dumps(json2).encode("utf-8"))

file_refs = [
OpenedFileRef(name="test1.json", contents=file1),
OpenedFileRef(name="test2.json", contents=file2),
]

result = StreamStart(file_refs).flat_map(json_records()).to_list()
expected = json1 + json2
assert result == expected


def test_csv_records() -> None:
csv1 = "a,b\n1,2\n3,4"
csv2 = "a,b\n5,6\n7,8"
Expand All @@ -32,3 +50,25 @@ def test_csv_records() -> None:
{"a": "7", "b": "8"},
]
assert result == expected


def test_csv_records_from_file_ref() -> None:
csv1 = "a,b\n1,2\n3,4"
csv2 = "a,b\n5,6\n7,8"

file1 = io.BytesIO(csv1.encode("utf-8"))
file2 = io.BytesIO(csv2.encode("utf-8"))

file_refs = [
OpenedFileRef(name="test1.csv", contents=file1),
OpenedFileRef(name="test2.csv", contents=file2),
]

result = StreamStart(file_refs).flat_map(csv_records()).to_list()
expected = [
{"a": "1", "b": "2"},
{"a": "3", "b": "4"},
{"a": "5", "b": "6"},
{"a": "7", "b": "8"},
]
assert result == expected

0 comments on commit 3c9980b

Please sign in to comment.