Skip to content

Commit

Permalink
fix: fix ray lance sink error
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay-ju committed Dec 11, 2024
1 parent 7ec23f0 commit 24692b6
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions python/python/lance/ray/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pickle
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -17,16 +16,14 @@
Union,
)

import pandas as pd
import pyarrow as pa

import lance
from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, FragmentMetadata, write_fragments

from ..dependencies import ray

if TYPE_CHECKING:
import pandas as pd

__all__ = ["LanceDatasink", "LanceFragmentWriter", "LanceCommitter", "write_lance"]


Expand Down Expand Up @@ -129,8 +126,22 @@ def on_write_start(self):

def on_write_complete(
self,
write_results: List[List[Tuple[str, str]]],
write_results,
):
if not write_results:
import warnings

warnings.warn(
"write_result_blocks is empty.",
DeprecationWarning,
)
return "Empty list"

first_element = write_results[0]
if isinstance(first_element, (pa.Table, pd.DataFrame)):
write_results = [
result["write_result"].iloc[0]["result"] for result in write_results
]
fragments = []
schema = None
for batch in write_results:
Expand Down

0 comments on commit 24692b6

Please sign in to comment.