Skip to content

Commit

Permalink
fix: fix ray lance sink error
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay-ju committed Dec 30, 2024
1 parent 7ec23f0 commit 0cc5d17
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,22 @@ def _pd_to_arrow(
return pa.Table.from_pydict(df, schema=schema)
if _PANDAS_AVAILABLE and isinstance(df, pd.DataFrame):
tbl = pa.Table.from_pandas(df, schema=schema)
new_schema = tbl.schema.remove_metadata()
new_table = tbl.replace_schema_metadata(new_schema.metadata)
return new_table
tbl.schema = tbl.schema.remove_metadata()
return tbl
return df


def _write_fragment(
stream: Iterable[Union[pa.Table, "pd.DataFrame"]],
stream: Iterable[Union[pa.Table, "pd.Pandas"]],
uri: str,
*,
schema: Optional[pa.Schema] = None,
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: int = 1024, # Only useful for v1 writer.
data_storage_version: Optional[str] = None,
data_storage_version: str = "stable",
storage_options: Optional[Dict[str, Any]] = None,
) -> List[Tuple[FragmentMetadata, pa.Schema]]:
) -> Tuple[FragmentMetadata, pa.Schema]:
from ..dependencies import _PANDAS_AVAILABLE
from ..dependencies import pandas as pd

Expand Down Expand Up @@ -131,17 +130,24 @@ def on_write_complete(
self,
write_results: List[List[Tuple[str, str]]],
):
if not write_results:
import warnings

warnings.warn(
"write_result_blocks is empty.",
DeprecationWarning,
)
return "Empty list"

if hasattr(write_results, "write_returns"):
write_results = write_results.write_returns
fragments = []
schema = None
for batch in write_results:
for fragment_str, schema_str in batch:
fragment = pickle.loads(fragment_str)
fragments.append(fragment)
schema = pickle.loads(schema_str)
# Check weather writer has fragments or not.
# Skip commit when there are no fragments.
if not schema:
return
if self.mode in set(["create", "overwrite"]):
op = lance.LanceOperation.Overwrite(schema, fragments)
elif self.mode == "append":
Expand Down Expand Up @@ -173,7 +179,7 @@ class LanceDatasink(_BaseLanceDatasink):
Choices are 'append', 'create', 'overwrite'.
max_rows_per_file : int, optional
The maximum number of rows per file. Default is 1024 * 1024.
data_storage_version: optional, str, default None
data_storage_version: optional, str, default "legacy"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
Expand All @@ -193,7 +199,7 @@ def __init__(
schema: Optional[pa.Schema] = None,
mode: Literal["create", "append", "overwrite"] = "create",
max_rows_per_file: int = 1024 * 1024,
data_storage_version: Optional[str] = None,
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = None,
storage_options: Optional[Dict[str, Any]] = None,
*args,
Expand Down Expand Up @@ -278,10 +284,11 @@ class LanceFragmentWriter:
max_rows_per_group : int, optional
The maximum number of rows per group. Default is 1024.
Only useful for v1 writer.
data_storage_version: optional, str, default None
data_storage_version: optional, str, default "legacy"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default
(None) will use the 2.0 version. See the user guide for more details.
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
use_legacy_format : optional, bool, default None
Deprecated method for setting the data storage version. Use the
`data_storage_version` parameter instead.
Expand All @@ -294,12 +301,13 @@ def __init__(
self,
uri: str,
*,
transform: Optional[Callable[[pa.Table], Union[pa.Table, Generator]]] = None,
transform: Optional[Callable[[pa.Table],
Union[pa.Table, Generator]]] = None,
schema: Optional[pa.Schema] = None,
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
max_rows_per_group: Optional[int] = None, # Only useful for v1 writer.
data_storage_version: Optional[str] = None,
data_storage_version: str = "stable",
use_legacy_format: Optional[bool] = False,
storage_options: Optional[Dict[str, Any]] = None,
):
Expand Down Expand Up @@ -352,7 +360,7 @@ def __call__(self, batch: Union[pa.Table, "pd.DataFrame"]) -> Dict[str, Any]:


class LanceCommitter(_BaseLanceDatasink):
"""Lance Committer as Ray Datasink.
"""Lance Commiter as Ray Datasink.

Check warning on line 363 in python/python/lance/ray/sink.py

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Commiter" should be "Committer".
This is used with `LanceFragmentWriter` to write large-than-memory data to
lance file.
Expand All @@ -363,7 +371,7 @@ def num_rows_per_write(self) -> int:
return 1

def get_name(self) -> str:
return f"LanceCommitter({self.mode})"
return f"LanceCommiter({self.mode})"

Check warning on line 374 in python/python/lance/ray/sink.py

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Commiter" should be "Committer".

def write(
self,
Expand All @@ -373,10 +381,6 @@ def write(
"""Passthrough the fragments to commit phase"""
v = []
for block in blocks:
# If block is empty, skip to get "fragment" and "schema" filed
if len(block) == 0:
continue

for fragment, schema in zip(
block["fragment"].to_pylist(), block["schema"].to_pylist()
):
Expand All @@ -395,7 +399,7 @@ def write_lance(
max_rows_per_file: int = 1024 * 1024,
max_bytes_per_file: Optional[int] = None,
storage_options: Optional[Dict[str, Any]] = None,
data_storage_version: Optional[str] = None,
data_storage_version: str = "stable",
) -> None:
"""Write Ray dataset at scale.
Expand All @@ -418,10 +422,11 @@ def write_lance(
The maximum number of bytes per file. Default is 90GB.
storage_options : Dict[str, Any], optional
The storage options for the writer. Default is None.
data_storage_version: optional, str, default None
data_storage_version: optional, str, default "legacy"
The version of the data storage format to use. Newer versions are more
efficient but require newer versions of lance to read. The default
(None) will use the 2.0 version. See the user guide for more details.
efficient but require newer versions of lance to read. The default is
"legacy" which will use the legacy v1 version. See the user guide
for more details.
"""
data.map_batches(
LanceFragmentWriter(
Expand Down

0 comments on commit 0cc5d17

Please sign in to comment.