Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pl.sink_* in LazyPolarsDataset._save #702

Open
AnH0ang opened this issue May 28, 2024 · 4 comments
Open

Use pl.sink_* in LazyPolarsDataset._save #702

AnH0ang opened this issue May 28, 2024 · 4 comments

Comments

@AnH0ang
Copy link

AnH0ang commented May 28, 2024

Description

When passing a lazy DataFrame to LazyPolarsDataset, it is currently collected into an eager DataFrame before writing it using the appropriate pl.write_* function. This can be skipped by writing the lazy dataframe using pl.sink_*.

Context

In some cases, it may be faster to collect the lazy DataFrame in streaming mode. Additionally, it is not always possible to collect the entire DataFrame (e.g., if the data is too large). Using pl.sink_*, the entire data set does not need to be loaded.

Possible Implementation

In the _save function, the input DataFrame could first be coerced into a lazy DataFrame and then written to disk using pl.sink_*.

Possible Alternatives

@AnH0ang
Copy link
Author

AnH0ang commented May 28, 2024

I just tried to implement it myself and encountered a problem. LazyPolarsDataset first writes the dataframe into a buffer before writing the buffer to disk with fsspec. However, sink_* does not support writing to a buffer.

def _save(self, data: pl.DataFrame | pl.LazyFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
collected_data = None
if isinstance(data, pl.LazyFrame):
collected_data = data.collect()
else:
collected_data = data
# Note: polars does support writing partitioned parquet file
# it is leveraging Arrow to do so, see e.g.
# https://pola-rs.github.io/polars/py-polars/html/reference/api/polars.DataFrame.write_parquet.html
save_method = getattr(collected_data, f"write_{self._file_format}", None)
if save_method:
buf = BytesIO()
save_method(file=buf, **self._save_args)
with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(buf.getvalue())
self._invalidate_cache()
# How the LazyPolarsDataset logic is currently written with
# ACCEPTED_FILE_FORMATS and a check in the `__init__` method,
# this else loop is never reached, hence we exclude it from coverage report
# but leave it in for consistency between the Eager and Lazy classes
else: # pragma: no cover
raise DatasetError(
f"Unable to retrieve 'polars.DataFrame.write_{self._file_format}' "
"method, please ensure that your 'file_format' parameter has been "
"defined correctly as per the Polars API"
"https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html"
)

@MatthiasRoels
Copy link
Contributor

I would hold off such a change for now. As mentioned in the docs:

Streaming mode is considered unstable. It may be changed at any point without it being considered a breaking change.

On top of that, I heard the Polars team is working on completely rewriting their streaming engine. So I would just stick with the current implementation...

@astrojuanlu
Copy link
Member

Indeed, streaming mode is unstable, but lazy non-streaming methods are considered stable, as far as I understand?

About what to do with remote storages, maybe we can offer a sink_* path for local files?

@MatthiasRoels
Copy link
Contributor

MatthiasRoels commented Jun 27, 2024

Actually, the sink_* methods also use the streaming engine under the hood. Hence these methods should also be considered unstable as is explicitly mentioned in the docs. So I wouldn’t recommend using it either.

Regular methods on the other hand are stable. As a matter of fact, almost all eager methods use the corresponding lazy method under the hood (e.g. .lazy().op().collect()).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants