diff --git a/python/src/space/core/ops/change_data.py b/python/src/space/core/ops/change_data.py index 1482cad..3d067ec 100644 --- a/python/src/space/core/ops/change_data.py +++ b/python/src/space/core/ops/change_data.py @@ -58,29 +58,26 @@ def read_change_data(storage: Storage, start_snapshot_id: int, start_snapshot_id is excluded; end_snapshot_id is included. """ - if start_snapshot_id > end_snapshot_id: + if start_snapshot_id >= end_snapshot_id: raise errors.UserInputError( f"End snapshot ID {end_snapshot_id} should not be lower than start " f"snapshot ID {start_snapshot_id}") - all_snapshot_ids = sorted(storage.snapshot_ids) - all_snapshot_ids_set = set(all_snapshot_ids) - - if start_snapshot_id not in all_snapshot_ids_set: - raise errors.VersionNotFoundError( - f"Start snapshot ID not found: {start_snapshot_id}") - - if end_snapshot_id not in all_snapshot_ids_set: - raise errors.VersionNotFoundError( - f"Start snapshot ID not found: {end_snapshot_id}") + all_snapshot_ids: List[int] = [] + current_snapshot = storage.snapshot(end_snapshot_id) + while current_snapshot.snapshot_id >= start_snapshot_id: + all_snapshot_ids.insert(0, current_snapshot.snapshot_id) + if not current_snapshot.HasField("parent_snapshot_id"): + break - for snapshot_id in all_snapshot_ids: - if snapshot_id <= start_snapshot_id: - continue + current_snapshot = storage.snapshot(current_snapshot.parent_snapshot_id) - if snapshot_id > end_snapshot_id: - break + if start_snapshot_id != all_snapshot_ids[0]: + raise errors.UserInputError( + f"Start snapshot {start_snapshot_id} is not the ancestor of " + f"end snapshot{end_snapshot_id}") + for snapshot_id in all_snapshot_ids[1:]: for result in iter(_LocalChangeDataReadOp(storage, snapshot_id)): yield result diff --git a/python/src/space/core/proto/metadata.proto b/python/src/space/core/proto/metadata.proto index beb4e1d..db2c949 100644 --- a/python/src/space/core/proto/metadata.proto +++ b/python/src/space/core/proto/metadata.proto @@ -84,7 +84,7 @@ message Schema { // Storage snapshot persisting physical metadata such as manifest file paths. // It is used for obtaining all alive data file paths for a given snapshot. -// NEXT_ID: 6 +// NEXT_ID: 7 message Snapshot { // The snapshot ID. int64 snapshot_id = 1; @@ -104,6 +104,9 @@ message Snapshot { // File path of the change log of the snapshot. string change_log_file = 5; + + // The snapshot ID of the parent snapshot. + optional int64 parent_snapshot_id = 6; } // Reference to a snapshot. diff --git a/python/src/space/core/proto/metadata_pb2.py b/python/src/space/core/proto/metadata_pb2.py index 036cfa1..041f361 100644 --- a/python/src/space/core/proto/metadata_pb2.py +++ b/python/src/space/core/proto/metadata_pb2.py @@ -16,7 +16,7 @@ from substrait import type_pb2 as substrait_dot_type__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/plan.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xe9\x04\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12.\n\x0clogical_plan\x18\x07 \x01(\x0b\x32\x18.space.proto.LogicalPlan\x12\x34\n\x04refs\x18\x08 \x03(\x0b\x32&.space.proto.StorageMetadata.RefsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\x1aK\n\tRefsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12-\n\x05value\x18\x02 \x01(\x0b\x32\x1e.space.proto.SnapshotReference:\x02\x38\x01\"@\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\x12\x15\n\x11MATERIALIZED_VIEW\x10\x02\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xe8\x01\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\tB\x0b\n\tdata_info\"\xb8\x01\n\x11SnapshotReference\x12\x16\n\x0ereference_name\x18\x01 \x01(\t\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x03\x12:\n\x04type\x18\x03 \x01(\x0e\x32,.space.proto.SnapshotReference.ReferenceType\":\n\rReferenceType\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x07\n\x03TAG\x10\x01\x12\n\n\x06\x42RANCH\x10\x02\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmap\"\x93\x01\n\x0bLogicalPlan\x12%\n\x0clogical_plan\x18\x01 \x01(\x0b\x32\x0f.substrait.Plan\x12\x30\n\x04udfs\x18\x02 \x03(\x0b\x32\".space.proto.LogicalPlan.UdfsEntry\x1a+\n\tUdfsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1d\n\x08\x46ileType\x12\x11\n\tdirectory\x18\x01 \x01(\tb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1fspace/core/proto/metadata.proto\x12\x0bspace.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x14substrait/plan.proto\x1a\x14substrait/type.proto\"#\n\nEntryPoint\x12\x15\n\rmetadata_file\x18\x01 \x01(\t\"\xe9\x04\n\x0fStorageMetadata\x12/\n\x0b\x63reate_time\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x10last_update_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12/\n\x04type\x18\x03 \x01(\x0e\x32!.space.proto.StorageMetadata.Type\x12#\n\x06schema\x18\x04 \x01(\x0b\x32\x13.space.proto.Schema\x12\x1b\n\x13\x63urrent_snapshot_id\x18\x05 \x01(\x03\x12>\n\tsnapshots\x18\x06 \x03(\x0b\x32+.space.proto.StorageMetadata.SnapshotsEntry\x12.\n\x0clogical_plan\x18\x07 \x01(\x0b\x32\x18.space.proto.LogicalPlan\x12\x34\n\x04refs\x18\x08 \x03(\x0b\x32&.space.proto.StorageMetadata.RefsEntry\x1aG\n\x0eSnapshotsEntry\x12\x0b\n\x03key\x18\x01 \x01(\x03\x12$\n\x05value\x18\x02 \x01(\x0b\x32\x15.space.proto.Snapshot:\x02\x38\x01\x1aK\n\tRefsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12-\n\x05value\x18\x02 \x01(\x0b\x32\x1e.space.proto.SnapshotReference:\x02\x38\x01\"@\n\x04Type\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x0b\n\x07\x44\x41TASET\x10\x01\x12\x15\n\x11MATERIALIZED_VIEW\x10\x02\"]\n\x06Schema\x12&\n\x06\x66ields\x18\x01 \x01(\x0b\x32\x16.substrait.NamedStruct\x12\x14\n\x0cprimary_keys\x18\x02 \x03(\t\x12\x15\n\rrecord_fields\x18\x03 \x03(\t\"\xa0\x02\n\x08Snapshot\x12\x13\n\x0bsnapshot_id\x18\x01 \x01(\x03\x12/\n\x0b\x63reate_time\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x34\n\x0emanifest_files\x18\x03 \x01(\x0b\x32\x1a.space.proto.ManifestFilesH\x00\x12:\n\x12storage_statistics\x18\x04 \x01(\x0b\x32\x1e.space.proto.StorageStatistics\x12\x17\n\x0f\x63hange_log_file\x18\x05 \x01(\t\x12\x1f\n\x12parent_snapshot_id\x18\x06 \x01(\x03H\x01\x88\x01\x01\x42\x0b\n\tdata_infoB\x15\n\x13_parent_snapshot_id\"\xb8\x01\n\x11SnapshotReference\x12\x16\n\x0ereference_name\x18\x01 \x01(\t\x12\x13\n\x0bsnapshot_id\x18\x02 \x01(\x03\x12:\n\x04type\x18\x03 \x01(\x0e\x32,.space.proto.SnapshotReference.ReferenceType\":\n\rReferenceType\x12\x14\n\x10TYPE_UNSPECIFIED\x10\x00\x12\x07\n\x03TAG\x10\x01\x12\n\n\x06\x42RANCH\x10\x02\"L\n\rManifestFiles\x12\x1c\n\x14index_manifest_files\x18\x01 \x03(\t\x12\x1d\n\x15record_manifest_files\x18\x02 \x03(\t\"\x8a\x01\n\x11StorageStatistics\x12\x10\n\x08num_rows\x18\x01 \x01(\x03\x12\x1e\n\x16index_compressed_bytes\x18\x02 \x01(\x03\x12 \n\x18index_uncompressed_bytes\x18\x03 \x01(\x03\x12!\n\x19record_uncompressed_bytes\x18\x04 \x01(\x03\"e\n\tChangeLog\x12,\n\x0c\x64\x65leted_rows\x18\x01 \x03(\x0b\x32\x16.space.proto.RowBitmap\x12*\n\nadded_rows\x18\x02 \x03(\x0b\x32\x16.space.proto.RowBitmap\"O\n\tRowBitmap\x12\x0c\n\x04\x66ile\x18\x01 \x01(\t\x12\x10\n\x08\x61ll_rows\x18\x02 \x01(\x08\x12\x18\n\x0eroaring_bitmap\x18\x03 \x01(\x0cH\x00\x42\x08\n\x06\x62itmap\"\x93\x01\n\x0bLogicalPlan\x12%\n\x0clogical_plan\x18\x01 \x01(\x0b\x32\x0f.substrait.Plan\x12\x30\n\x04udfs\x18\x02 \x03(\x0b\x32\".space.proto.LogicalPlan.UdfsEntry\x1a+\n\tUdfsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1d\n\x08\x46ileType\x12\x11\n\tdirectory\x18\x01 \x01(\tb\x06proto3') _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'space.core.proto.metadata_pb2', globals()) @@ -42,23 +42,23 @@ _SCHEMA._serialized_start=782 _SCHEMA._serialized_end=875 _SNAPSHOT._serialized_start=878 - _SNAPSHOT._serialized_end=1110 - _SNAPSHOTREFERENCE._serialized_start=1113 - _SNAPSHOTREFERENCE._serialized_end=1297 - _SNAPSHOTREFERENCE_REFERENCETYPE._serialized_start=1239 - _SNAPSHOTREFERENCE_REFERENCETYPE._serialized_end=1297 - _MANIFESTFILES._serialized_start=1299 - _MANIFESTFILES._serialized_end=1375 - _STORAGESTATISTICS._serialized_start=1378 - _STORAGESTATISTICS._serialized_end=1516 - _CHANGELOG._serialized_start=1518 - _CHANGELOG._serialized_end=1619 - _ROWBITMAP._serialized_start=1621 - _ROWBITMAP._serialized_end=1700 - _LOGICALPLAN._serialized_start=1703 - _LOGICALPLAN._serialized_end=1850 - _LOGICALPLAN_UDFSENTRY._serialized_start=1807 - _LOGICALPLAN_UDFSENTRY._serialized_end=1850 - _FILETYPE._serialized_start=1852 - _FILETYPE._serialized_end=1881 + _SNAPSHOT._serialized_end=1166 + _SNAPSHOTREFERENCE._serialized_start=1169 + _SNAPSHOTREFERENCE._serialized_end=1353 + _SNAPSHOTREFERENCE_REFERENCETYPE._serialized_start=1295 + _SNAPSHOTREFERENCE_REFERENCETYPE._serialized_end=1353 + _MANIFESTFILES._serialized_start=1355 + _MANIFESTFILES._serialized_end=1431 + _STORAGESTATISTICS._serialized_start=1434 + _STORAGESTATISTICS._serialized_end=1572 + _CHANGELOG._serialized_start=1574 + _CHANGELOG._serialized_end=1675 + _ROWBITMAP._serialized_start=1677 + _ROWBITMAP._serialized_end=1756 + _LOGICALPLAN._serialized_start=1759 + _LOGICALPLAN._serialized_end=1906 + _LOGICALPLAN_UDFSENTRY._serialized_start=1863 + _LOGICALPLAN_UDFSENTRY._serialized_end=1906 + _FILETYPE._serialized_start=1908 + _FILETYPE._serialized_end=1937 # @@protoc_insertion_point(module_scope) diff --git a/python/src/space/core/proto/metadata_pb2.pyi b/python/src/space/core/proto/metadata_pb2.pyi index 6679758..37c9a07 100644 --- a/python/src/space/core/proto/metadata_pb2.pyi +++ b/python/src/space/core/proto/metadata_pb2.pyi @@ -209,7 +209,7 @@ global___Schema = Schema class Snapshot(google.protobuf.message.Message): """Storage snapshot persisting physical metadata such as manifest file paths. It is used for obtaining all alive data file paths for a given snapshot. - NEXT_ID: 6 + NEXT_ID: 7 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -219,6 +219,7 @@ class Snapshot(google.protobuf.message.Message): MANIFEST_FILES_FIELD_NUMBER: builtins.int STORAGE_STATISTICS_FIELD_NUMBER: builtins.int CHANGE_LOG_FILE_FIELD_NUMBER: builtins.int + PARENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int snapshot_id: builtins.int """The snapshot ID.""" @property @@ -234,6 +235,8 @@ class Snapshot(google.protobuf.message.Message): """Statistics of all data in the storage.""" change_log_file: builtins.str """File path of the change log of the snapshot.""" + parent_snapshot_id: builtins.int + """The snapshot ID of the parent snapshot.""" def __init__( self, *, @@ -242,9 +245,13 @@ class Snapshot(google.protobuf.message.Message): manifest_files: global___ManifestFiles | None = ..., storage_statistics: global___StorageStatistics | None = ..., change_log_file: builtins.str = ..., + parent_snapshot_id: builtins.int | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["change_log_file", b"change_log_file", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "parent_snapshot_id", b"parent_snapshot_id", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id", "change_log_file", b"change_log_file", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "parent_snapshot_id", b"parent_snapshot_id", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id"]) -> typing_extensions.Literal["parent_snapshot_id"] | None: ... + @typing.overload def WhichOneof(self, oneof_group: typing_extensions.Literal["data_info", b"data_info"]) -> typing_extensions.Literal["manifest_files"] | None: ... global___Snapshot = Snapshot diff --git a/python/src/space/core/storage.py b/python/src/space/core/storage.py index 642ddc4..146fa73 100644 --- a/python/src/space/core/storage.py +++ b/python/src/space/core/storage.py @@ -268,7 +268,8 @@ def commit(self, patch: rt.Patch) -> None: snapshot_id=new_snapshot_id, create_time=proto_now(), manifest_files=current_snapshot.manifest_files, - storage_statistics=current_snapshot.storage_statistics) + storage_statistics=current_snapshot.storage_statistics, + parent_snapshot_id=current_snapshot.snapshot_id) _patch_manifests(snapshot.manifest_files, patch) if patch.HasField('change_log'):