Skip to content

Commit

Permalink
Extract object store snapshot path helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Dec 23, 2024
1 parent 25a89c0 commit aea27d7
Showing 1 changed file with 73 additions and 51 deletions.
124 changes: 73 additions & 51 deletions crates/worker/src/partition/snapshots/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ pub struct LatestSnapshot {
}

impl LatestSnapshot {
pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self {
pub fn from_snapshot(
snapshot: &PartitionSnapshotMetadata,
snapshot_unique_key: String,
) -> Self {
LatestSnapshot {
version: snapshot.version,
cluster_name: snapshot.cluster_name.clone(),
Expand All @@ -109,7 +112,7 @@ impl LatestSnapshot {
snapshot_id: snapshot.snapshot_id,
created_at: snapshot.created_at.clone(),
min_applied_lsn: snapshot.min_applied_lsn,
path,
path: snapshot_unique_key,
}
}
}
Expand Down Expand Up @@ -208,29 +211,17 @@ impl SnapshotRepository {
snapshot: &PartitionSnapshotMetadata,
local_snapshot_path: &Path,
) -> Result<(), PutSnapshotError> {
// A unique snapshot path within the partition prefix. We pad the LSN to ensure correct
// lexicographic sorting.
let snapshot_prefix = Self::get_snapshot_prefix(snapshot);
let full_snapshot_path = format!(
"{prefix}{partition_id}/{snapshot_prefix}",
prefix = self.prefix,
partition_id = snapshot.partition_id,
);

let snapshot_prefix = self.get_base_prefix(snapshot);
debug!(
"Uploading snapshot from {:?} to {}",
local_snapshot_path, full_snapshot_path
local_snapshot_path, snapshot_prefix
);

let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone());
let mut progress = SnapshotUploadProgress::with_snapshot_path(snapshot_prefix.clone());
let mut buf = BytesMut::new();
for file in &snapshot.files {
let filename = file.name.trim_start_matches("/");
let key = object_store::path::Path::from(format!(
"{}/{}",
full_snapshot_path.as_str(),
filename
));
let key = self.get_snapshot_file(snapshot, filename);

let put_result = put_snapshot_object(
local_snapshot_path.join(filename).as_path(),
Expand All @@ -249,10 +240,7 @@ impl SnapshotRepository {
progress.push(file.name.clone());
}

let metadata_key = object_store::path::Path::from(format!(
"{}/metadata.json",
full_snapshot_path.as_str()
));
let metadata_key = self.get_snapshot_file(snapshot, "metadata.json");
let metadata_json_payload = PutPayload::from(
serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"),
);
Expand All @@ -270,11 +258,7 @@ impl SnapshotRepository {
"Successfully published snapshot metadata",
);

let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = snapshot.partition_id,
));
let latest_path = self.get_latest_snapshot_pointer(snapshot.partition_id);

// By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates.
let maybe_stored = self
Expand All @@ -293,7 +277,7 @@ impl SnapshotRepository {
return Ok(());
}

let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix);
let latest = LatestSnapshot::from_snapshot(snapshot, Self::padded_snapshot_key(snapshot));
let latest = PutPayload::from(
serde_json::to_string_pretty(&latest)
.map_err(|e| PutSnapshotError::from(e, progress.clone()))?,
Expand Down Expand Up @@ -329,14 +313,6 @@ impl SnapshotRepository {
Ok(())
}

fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String {
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}

/// Discover and download the latest snapshot available. It is the caller's responsibility
/// to delete the snapshot directory when it is no longer needed.
#[instrument(
Expand All @@ -349,11 +325,7 @@ impl SnapshotRepository {
&self,
partition_id: PartitionId,
) -> anyhow::Result<Option<LocalPartitionSnapshot>> {
let latest_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/latest.json",
prefix = self.prefix,
partition_id = partition_id,
));
let latest_path = self.get_latest_snapshot_pointer(partition_id);

let latest = self.object_store.get(&latest_path).await;

Expand All @@ -370,10 +342,10 @@ impl SnapshotRepository {
debug!("Latest snapshot metadata: {:?}", latest);

let snapshot_metadata_path = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/metadata.json",
"{prefix}{partition_id}/{snapshot_path}/metadata.json",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
));
let snapshot_metadata = self.object_store.get(&snapshot_metadata_path).await;

Expand Down Expand Up @@ -429,10 +401,10 @@ impl SnapshotRepository {
let filename = file.name.trim_start_matches("/");
let expected_size = file.size;
let key = object_store::path::Path::from(format!(
"{prefix}{partition_id}/{path}/{filename}",
"{prefix}{partition_id}/{snapshot_path}/{filename}",
prefix = self.prefix,
partition_id = partition_id,
path = latest.path,
snapshot_path = latest.path,
filename = filename,
));
let file_path = snapshot_dir.path().join(filename);
Expand Down Expand Up @@ -550,6 +522,56 @@ impl SnapshotRepository {
}
}
}

/// Construct the full object path to the latest snapshot pointer for a given partition.
fn get_latest_snapshot_pointer(&self, partition_id: PartitionId) -> object_store::path::Path {
object_store::path::Path::from(format!(
"{partition_prefix}/latest.json",
partition_prefix = self.get_partition_snapshots_prefix(partition_id)
))
}

/// Construct the prefix relative to the destination root for a given partition's snapshots.
fn get_partition_snapshots_prefix(&self, partition_id: PartitionId) -> String {
// Note: the destination prefix must include a separator if not empty.
format!(
"{prefix}{partition_id}",
prefix = self.prefix,
partition_id = partition_id,
)
}

/// Construct the complete snapshot prefix from the base of the object store destination.
fn get_base_prefix(&self, snapshot_metadata: &PartitionSnapshotMetadata) -> String {
format!(
"{partition_prefix}/{snapshot_path}",
partition_prefix = self.get_partition_snapshots_prefix(snapshot_metadata.partition_id),
snapshot_path = Self::padded_snapshot_key(snapshot_metadata)
)
}

/// Construct the full object path for a specific file from the given snapshot.
fn get_snapshot_file(
&self,
snapshot_metadata: &PartitionSnapshotMetadata,
filename: &str,
) -> object_store::path::Path {
object_store::path::Path::from(format!(
"{base_prefix}/{filename}",
base_prefix = self.get_base_prefix(snapshot_metadata)
))
}

/// Construct the unique path component for a snapshot, e.g. `lsn_00001234-snap_abc123`.
/// The LSN is zero-padded for correct lexicographical sorting.
fn padded_snapshot_key(snapshot: &PartitionSnapshotMetadata) -> String {
// We zero-pad the LSN to ensure correct lexicographic sorting
format!(
"lsn_{lsn:020}-{snapshot_id}",
lsn = snapshot.min_applied_lsn,
snapshot_id = snapshot.snapshot_id
)
}
}

async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn ObjectStore>> {
Expand Down Expand Up @@ -594,14 +616,14 @@ async fn create_object_store_client(destination: Url) -> anyhow::Result<Arc<dyn

#[derive(Clone, Debug)]
struct SnapshotUploadProgress {
pub full_snapshot_path: String,
pub snapshot_complete_path: String,
pub uploaded_files: Vec<String>,
}

impl SnapshotUploadProgress {
fn with_snapshot_path(full_snapshot_path: String) -> Self {
fn with_snapshot_path(snapshot_complete_path: String) -> Self {
SnapshotUploadProgress {
full_snapshot_path,
snapshot_complete_path,
uploaded_files: vec![],
}
}
Expand All @@ -624,7 +646,7 @@ impl PutSnapshotError {
{
PutSnapshotError {
error: error.into(),
full_snapshot_path: progress.full_snapshot_path,
full_snapshot_path: progress.snapshot_complete_path,
uploaded_files: progress.uploaded_files,
}
}
Expand Down Expand Up @@ -865,7 +887,7 @@ mod tests {

repository.put(&snapshot1, source_dir.clone()).await?;

let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1);
let snapshot_prefix = SnapshotRepository::padded_snapshot_key(&snapshot1);
let data = object_store
.get(&Path::from(format!(
"{}/{}/{}/data.sst",
Expand Down Expand Up @@ -921,7 +943,7 @@ mod tests {
assert_eq!(
LatestSnapshot::from_snapshot(
&snapshot2,
SnapshotRepository::get_snapshot_prefix(&snapshot2)
SnapshotRepository::padded_snapshot_key(&snapshot2)
),
latest
);
Expand Down

0 comments on commit aea27d7

Please sign in to comment.