Skip to content

Commit

Permalink
Support reading a slice of an index file and its records as a Ray dat…
Browse files Browse the repository at this point in the history
…a block
  • Loading branch information
coufon committed Jan 25, 2024
1 parent e26d79f commit f50936d
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 36 deletions.
8 changes: 6 additions & 2 deletions python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,17 @@ def __init__(self,

def __iter__(self) -> Iterator[pa.Table]:
for file in self._file_set.index_files:
# TODO: to read row group by row group if needed, maybe not because index
# data is usually small.
# TODO: always loading the whole table is inefficient, to only load the
# required row groups.
index_data = pq.read_table(
self.full_path(file.path),
columns=self._selected_fields,
filters=self._options.filter_) # type: ignore[arg-type]

if file.selected_rows.end > 0:
length = file.selected_rows.end - file.selected_rows.start
index_data = index_data.slice(file.selected_rows.start, length)

if self._options.reference_read:
yield index_data
continue
Expand Down
13 changes: 12 additions & 1 deletion python/src/space/core/proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import "space/core/proto/metadata.proto";
package space.proto;

// Information of a data file.
// NEXT_ID: 4
// NEXT_ID: 5
message DataFile {
// Data file path.
string path = 1;
Expand All @@ -29,6 +29,17 @@ message DataFile {

// Locally assigned manifest file IDs.
int64 manifest_file_id = 3;

message Range {
// Inclusive.
int64 start = 1;
// Exclusive.
int64 end = 2;
}

// A range of selected rows in the data file.
// Used for partially reading an index file and its records.
Range selected_rows = 4;
}

// A set of associated data and manifest files.
Expand Down
20 changes: 11 additions & 9 deletions python/src/space/core/proto/runtime_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions python/src/space/core/proto/runtime_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,55 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
@typing_extensions.final
class DataFile(google.protobuf.message.Message):
"""Information of a data file.
NEXT_ID: 4
NEXT_ID: 5
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

@typing_extensions.final
class Range(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

START_FIELD_NUMBER: builtins.int
END_FIELD_NUMBER: builtins.int
start: builtins.int
"""Inclusive."""
end: builtins.int
"""Exclusive."""
def __init__(
self,
*,
start: builtins.int = ...,
end: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["end", b"end", "start", b"start"]) -> None: ...

PATH_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_FIELD_NUMBER: builtins.int
MANIFEST_FILE_ID_FIELD_NUMBER: builtins.int
SELECTED_ROWS_FIELD_NUMBER: builtins.int
path: builtins.str
"""Data file path."""
@property
def storage_statistics(self) -> space.core.proto.metadata_pb2.StorageStatistics:
"""Storage statistics of data in the file."""
manifest_file_id: builtins.int
"""Locally assigned manifest file IDs."""
@property
def selected_rows(self) -> global___DataFile.Range:
"""A range of selected rows in the data file.
Used for partially reading an index file and its records.
"""
def __init__(
self,
*,
path: builtins.str = ...,
storage_statistics: space.core.proto.metadata_pb2.StorageStatistics | None = ...,
manifest_file_id: builtins.int = ...,
selected_rows: global___DataFile.Range | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "storage_statistics", b"storage_statistics"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> None: ...

global___DataFile = DataFile

Expand Down
3 changes: 2 additions & 1 deletion python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations
from datetime import datetime
import math
from os import path
from typing import Collection, Dict, Iterator, List, Optional, Union
from typing_extensions import TypeAlias
Expand Down Expand Up @@ -376,7 +377,7 @@ def ray_dataset(self, ray_options: RayOptions,
if read_options.batch_size is not None:
num_rows = ds.count()
assert num_rows >= 0 and read_options.batch_size > 0
return ds.repartition(num_rows // read_options.batch_size)
return ds.repartition(math.ceil(num_rows / read_options.batch_size))

return ds

Expand Down
51 changes: 31 additions & 20 deletions python/src/space/ray/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations
from functools import partial
import math
from typing import Any, Dict, List, Optional, TYPE_CHECKING

from ray.data.block import Block, BlockMetadata
Expand Down Expand Up @@ -73,25 +74,35 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
self._read_options.snapshot_id)

for index_file in file_set.index_files:
stats = index_file.storage_statistics
task_file_set = rt.FileSet(index_files=[index_file])

# The metadata about the block that we know prior to actually executing
# the read task.
# TODO: to populate the storage values.
block_metadata = BlockMetadata(
num_rows=stats.num_rows,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)

# TODO: A single index file (with record files) is a single block. To
# check whether row group granularity is needed.
read_fn = partial(FileSetReadOp, self._storage.location,
self._storage.metadata, task_file_set,
self._read_options)
read_tasks.append(ReadTask(read_fn, block_metadata))
num_rows = index_file.storage_statistics.num_rows
batch_size = self._read_options.batch_size
if batch_size is None:
batch_size = 1

num_blocks = math.ceil(num_rows / batch_size)
for i in range(num_blocks):
index_file_slice = rt.DataFile()
index_file_slice.CopyFrom(index_file)

rows = index_file_slice.selected_rows
rows.start = i * batch_size
rows.end = min((i + 1) * batch_size, num_rows)

# The metadata about the block that we know prior to actually executing
# the read task.
# TODO: to populate the storage values.
block_metadata = BlockMetadata(
num_rows=rows.end - rows.start,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)

read_fn = partial(FileSetReadOp, self._storage.location,
self._storage.metadata,
rt.FileSet(index_files=[index_file_slice]),
self._read_options)
read_tasks.append(ReadTask(read_fn, block_metadata))

return read_tasks

0 comments on commit f50936d

Please sign in to comment.