Skip to content

Commit

Permalink
introduce parent snapshot id and revamp change data to depend on it (#63
Browse files Browse the repository at this point in the history
)

This is for the introduction of the branching.
  • Loading branch information
huan233usc authored Jan 22, 2024
1 parent 447211b commit 34de328
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 41 deletions.
29 changes: 13 additions & 16 deletions python/src/space/core/ops/change_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,26 @@ def read_change_data(storage: Storage, start_snapshot_id: int,
start_snapshot_id is excluded; end_snapshot_id is included.
"""
if start_snapshot_id > end_snapshot_id:
if start_snapshot_id >= end_snapshot_id:
raise errors.UserInputError(
f"End snapshot ID {end_snapshot_id} should not be lower than start "
f"snapshot ID {start_snapshot_id}")

all_snapshot_ids = sorted(storage.snapshot_ids)
all_snapshot_ids_set = set(all_snapshot_ids)

if start_snapshot_id not in all_snapshot_ids_set:
raise errors.VersionNotFoundError(
f"Start snapshot ID not found: {start_snapshot_id}")

if end_snapshot_id not in all_snapshot_ids_set:
raise errors.VersionNotFoundError(
f"Start snapshot ID not found: {end_snapshot_id}")
all_snapshot_ids: List[int] = []
current_snapshot = storage.snapshot(end_snapshot_id)
while current_snapshot.snapshot_id >= start_snapshot_id:
all_snapshot_ids.insert(0, current_snapshot.snapshot_id)
if not current_snapshot.HasField("parent_snapshot_id"):
break

for snapshot_id in all_snapshot_ids:
if snapshot_id <= start_snapshot_id:
continue
current_snapshot = storage.snapshot(current_snapshot.parent_snapshot_id)

if snapshot_id > end_snapshot_id:
break
if start_snapshot_id != all_snapshot_ids[0]:
raise errors.UserInputError(
f"Start snapshot {start_snapshot_id} is not the ancestor of "
f"end snapshot{end_snapshot_id}")

for snapshot_id in all_snapshot_ids[1:]:
for result in iter(_LocalChangeDataReadOp(storage, snapshot_id)):
yield result

Expand Down
5 changes: 4 additions & 1 deletion python/src/space/core/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ message Schema {

// Storage snapshot persisting physical metadata such as manifest file paths.
// It is used for obtaining all alive data file paths for a given snapshot.
// NEXT_ID: 6
// NEXT_ID: 7
message Snapshot {
// The snapshot ID.
int64 snapshot_id = 1;
Expand All @@ -104,6 +104,9 @@ message Snapshot {

// File path of the change log of the snapshot.
string change_log_file = 5;

// The snapshot ID of the parent snapshot.
optional int64 parent_snapshot_id = 6;
}

// Reference to a snapshot.
Expand Down
40 changes: 20 additions & 20 deletions python/src/space/core/proto/metadata_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions python/src/space/core/proto/metadata_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ global___Schema = Schema
class Snapshot(google.protobuf.message.Message):
"""Storage snapshot persisting physical metadata such as manifest file paths.
It is used for obtaining all alive data file paths for a given snapshot.
NEXT_ID: 6
NEXT_ID: 7
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor
Expand All @@ -219,6 +219,7 @@ class Snapshot(google.protobuf.message.Message):
MANIFEST_FILES_FIELD_NUMBER: builtins.int
STORAGE_STATISTICS_FIELD_NUMBER: builtins.int
CHANGE_LOG_FILE_FIELD_NUMBER: builtins.int
PARENT_SNAPSHOT_ID_FIELD_NUMBER: builtins.int
snapshot_id: builtins.int
"""The snapshot ID."""
@property
Expand All @@ -234,6 +235,8 @@ class Snapshot(google.protobuf.message.Message):
"""Statistics of all data in the storage."""
change_log_file: builtins.str
"""File path of the change log of the snapshot."""
parent_snapshot_id: builtins.int
"""The snapshot ID of the parent snapshot."""
def __init__(
self,
*,
Expand All @@ -242,9 +245,13 @@ class Snapshot(google.protobuf.message.Message):
manifest_files: global___ManifestFiles | None = ...,
storage_statistics: global___StorageStatistics | None = ...,
change_log_file: builtins.str = ...,
parent_snapshot_id: builtins.int | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["change_log_file", b"change_log_file", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "parent_snapshot_id", b"parent_snapshot_id", "storage_statistics", b"storage_statistics"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id", "change_log_file", b"change_log_file", "create_time", b"create_time", "data_info", b"data_info", "manifest_files", b"manifest_files", "parent_snapshot_id", b"parent_snapshot_id", "snapshot_id", b"snapshot_id", "storage_statistics", b"storage_statistics"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["_parent_snapshot_id", b"_parent_snapshot_id"]) -> typing_extensions.Literal["parent_snapshot_id"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing_extensions.Literal["data_info", b"data_info"]) -> typing_extensions.Literal["manifest_files"] | None: ...

global___Snapshot = Snapshot
Expand Down
3 changes: 2 additions & 1 deletion python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ def commit(self, patch: rt.Patch) -> None:
snapshot_id=new_snapshot_id,
create_time=proto_now(),
manifest_files=current_snapshot.manifest_files,
storage_statistics=current_snapshot.storage_statistics)
storage_statistics=current_snapshot.storage_statistics,
parent_snapshot_id=current_snapshot.snapshot_id)
_patch_manifests(snapshot.manifest_files, patch)

if patch.HasField('change_log'):
Expand Down

0 comments on commit 34de328

Please sign in to comment.