From f50936de1d35104ca84fadaca6f729e60e826e73 Mon Sep 17 00:00:00 2001 From: coufon Date: Thu, 25 Jan 2024 04:06:06 +0000 Subject: [PATCH] Support reading a slice of an index file and its records as a Ray data block --- python/src/space/core/ops/read.py | 8 +++- python/src/space/core/proto/runtime.proto | 13 +++++- python/src/space/core/proto/runtime_pb2.py | 20 ++++---- python/src/space/core/proto/runtime_pb2.pyi | 31 +++++++++++-- python/src/space/core/storage.py | 3 +- python/src/space/ray/data_sources.py | 51 +++++++++++++-------- 6 files changed, 90 insertions(+), 36 deletions(-) diff --git a/python/src/space/core/ops/read.py b/python/src/space/core/ops/read.py index 32651db..d51540a 100644 --- a/python/src/space/core/ops/read.py +++ b/python/src/space/core/ops/read.py @@ -90,13 +90,17 @@ def __init__(self, def __iter__(self) -> Iterator[pa.Table]: for file in self._file_set.index_files: - # TODO: to read row group by row group if needed, maybe not because index - # data is usually small. + # TODO: always loading the whole table is inefficient, to only load the + # required row groups. index_data = pq.read_table( self.full_path(file.path), columns=self._selected_fields, filters=self._options.filter_) # type: ignore[arg-type] + if file.selected_rows.end > 0: + length = file.selected_rows.end - file.selected_rows.start + index_data = index_data.slice(file.selected_rows.start, length) + if self._options.reference_read: yield index_data continue diff --git a/python/src/space/core/proto/runtime.proto b/python/src/space/core/proto/runtime.proto index 873c723..1a33f32 100644 --- a/python/src/space/core/proto/runtime.proto +++ b/python/src/space/core/proto/runtime.proto @@ -19,7 +19,7 @@ import "space/core/proto/metadata.proto"; package space.proto; // Information of a data file. -// NEXT_ID: 4 +// NEXT_ID: 5 message DataFile { // Data file path. string path = 1; @@ -29,6 +29,17 @@ message DataFile { // Locally assigned manifest file IDs. int64 manifest_file_id = 3; + + message Range { + // Inclusive. + int64 start = 1; + // Exclusive. + int64 end = 2; + } + + // A range of selected rows in the data file. + // Used for partially reading an index file and its records. + Range selected_rows = 4; } // A set of associated data and manifest files. diff --git a/python/src/space/core/proto/runtime_pb2.py b/python/src/space/core/proto/runtime_pb2.py index 64dbc7f..f5a26fe 100644 --- a/python/src/space/core/proto/runtime_pb2.py +++ b/python/src/space/core/proto/runtime_pb2.py @@ -14,7 +14,7 @@ from space.core.proto import metadata_pb2 as space_dot_core_dot_proto_dot_metadata__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"n\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd2\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12*\n\nchange_log\x18\x04 \x01(\x0b\x32\x16.space.proto.ChangeLogb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1espace/core/proto/runtime.proto\x12\x0bspace.proto\x1a\x1fspace/core/proto/metadata.proto\"\xc7\x01\n\x08\x44\x61taFile\x12\x0c\n\x04path\x18\x01 \x01(\t\x12:\n\x12storage_statistics\x18\x02 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x18\n\x10manifest_file_id\x18\x03 \x01(\x03\x12\x32\n\rselected_rows\x18\x04 \x01(\x0b\x32\x1b.space.proto.DataFile.Range\x1a#\n\x05Range\x12\r\n\x05start\x18\x01 \x01(\x03\x12\x0b\n\x03\x65nd\x18\x02 \x01(\x03\"\xbc\x01\n\x07\x46ileSet\x12*\n\x0bindex_files\x18\x01 \x03(\x0b\x32\x15.space.proto.DataFile\x12J\n\x14index_manifest_files\x18\x02 \x03(\x0b\x32,.space.proto.FileSet.IndexManifestFilesEntry\x1a\x39\n\x17IndexManifestFilesEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xd2\x01\n\x05Patch\x12,\n\x08\x61\x64\x64ition\x18\x01 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12,\n\x08\x64\x65letion\x18\x02 \x01(\x0b\x32\x1a.space.proto.ManifestFiles\x12\x41\n\x19storage_statistics_update\x18\x03 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12*\n\nchange_log\x18\x04 \x01(\x0b\x32\x16.space.proto.ChangeLogb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.runtime_pb2', globals()) @@ -23,12 +23,14 @@ DESCRIPTOR._options = None _FILESET_INDEXMANIFESTFILESENTRY._options = None _FILESET_INDEXMANIFESTFILESENTRY._serialized_options = b'8\001' - _DATAFILE._serialized_start=80 - _DATAFILE._serialized_end=190 - _FILESET._serialized_start=193 - _FILESET._serialized_end=381 - _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=324 - _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=381 - _PATCH._serialized_start=384 - _PATCH._serialized_end=594 + _DATAFILE._serialized_start=81 + _DATAFILE._serialized_end=280 + _DATAFILE_RANGE._serialized_start=245 + _DATAFILE_RANGE._serialized_end=280 + _FILESET._serialized_start=283 + _FILESET._serialized_end=471 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_start=414 + _FILESET_INDEXMANIFESTFILESENTRY._serialized_end=471 + _PATCH._serialized_start=474 + _PATCH._serialized_end=684 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/runtime_pb2.pyi b/python/src/space/core/proto/runtime_pb2.pyi index af8f391..c94f2a7 100644 --- a/python/src/space/core/proto/runtime_pb2.pyi +++ b/python/src/space/core/proto/runtime_pb2.pyi @@ -33,14 +33,33 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor @typing_extensions.final class DataFile(google.protobuf.message.Message): """Information of a data file. - NEXT_ID: 4 + NEXT_ID: 5 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor + @typing_extensions.final + class Range(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + START_FIELD_NUMBER: builtins.int + END_FIELD_NUMBER: builtins.int + start: builtins.int + """Inclusive.""" + end: builtins.int + """Exclusive.""" + def __init__( + self, + *, + start: builtins.int = ..., + end: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["end", b"end", "start", b"start"]) -> None: ... + PATH_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_FIELD_NUMBER: builtins.int MANIFEST_FILE_ID_FIELD_NUMBER: builtins.int + SELECTED_ROWS_FIELD_NUMBER: builtins.int path: builtins.str """Data file path.""" @property @@ -48,15 +67,21 @@ class DataFile(google.protobuf.message.Message): """Storage statistics of data in the file.""" manifest_file_id: builtins.int """Locally assigned manifest file IDs.""" + @property + def selected_rows(self) -> global___DataFile.Range: + """A range of selected rows in the data file. + Used for partially reading an index file and its records. + """ def __init__( self, *, path: builtins.str = ..., storage_statistics: space.core.proto.metadata_pb2.StorageStatistics | None = ..., manifest_file_id: builtins.int = ..., + selected_rows: global___DataFile.Range | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["storage_statistics", b"storage_statistics"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "storage_statistics", b"storage_statistics"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["manifest_file_id", b"manifest_file_id", "path", b"path", "selected_rows", b"selected_rows", "storage_statistics", b"storage_statistics"]) -> None: ... global___DataFile = DataFile diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 5c910a3..dbb5c37 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -16,6 +16,7 @@ from __future__ import annotations from datetime import datetime +import math from os import path from typing import Collection, Dict, Iterator, List, Optional, Union from typing_extensions import TypeAlias @@ -376,7 +377,7 @@ def ray_dataset(self, ray_options: RayOptions, if read_options.batch_size is not None: num_rows = ds.count() assert num_rows >= 0 and read_options.batch_size > 0 - return ds.repartition(num_rows // read_options.batch_size) + return ds.repartition(math.ceil(num_rows / read_options.batch_size)) return ds diff --git a/python/src/space/ray/data_sources.py b/python/src/space/ray/data_sources.py index c1f72e0..00c76d8 100644 --- a/python/src/space/ray/data_sources.py +++ b/python/src/space/ray/data_sources.py @@ -16,6 +16,7 @@ from __future__ import annotations from functools import partial +import math from typing import Any, Dict, List, Optional, TYPE_CHECKING from ray.data.block import Block, BlockMetadata @@ -73,25 +74,35 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]: self._read_options.snapshot_id) for index_file in file_set.index_files: - stats = index_file.storage_statistics - task_file_set = rt.FileSet(index_files=[index_file]) - - # The metadata about the block that we know prior to actually executing - # the read task. - # TODO: to populate the storage values. - block_metadata = BlockMetadata( - num_rows=stats.num_rows, - size_bytes=None, - schema=None, - input_files=None, - exec_stats=None, - ) - - # TODO: A single index file (with record files) is a single block. To - # check whether row group granularity is needed. - read_fn = partial(FileSetReadOp, self._storage.location, - self._storage.metadata, task_file_set, - self._read_options) - read_tasks.append(ReadTask(read_fn, block_metadata)) + num_rows = index_file.storage_statistics.num_rows + batch_size = self._read_options.batch_size + if batch_size is None: + batch_size = 1 + + num_blocks = math.ceil(num_rows / batch_size) + for i in range(num_blocks): + index_file_slice = rt.DataFile() + index_file_slice.CopyFrom(index_file) + + rows = index_file_slice.selected_rows + rows.start = i * batch_size + rows.end = min((i + 1) * batch_size, num_rows) + + # The metadata about the block that we know prior to actually executing + # the read task. + # TODO: to populate the storage values. + block_metadata = BlockMetadata( + num_rows=rows.end - rows.start, + size_bytes=None, + schema=None, + input_files=None, + exec_stats=None, + ) + + read_fn = partial(FileSetReadOp, self._storage.location, + self._storage.metadata, + rt.FileSet(index_files=[index_file_slice]), + self._read_options) + read_tasks.append(ReadTask(read_fn, block_metadata)) return read_tasks