Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use reconstructed ListBlobs marker to provide list offset support in MicrosoftAzure store #6174

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ aws = ["cloud", "md-5"]
http = ["cloud"]
tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
integration = []
experimental-azure-list-offset = []

[dev-dependencies] # In alphabetical order
futures-test = "0.3"
Expand Down
81 changes: 81 additions & 0 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,61 @@ impl GetClient for AzureClient {
}
}

#[cfg(feature = "experimental-azure-list-offset")]
fn marker_for_offset(offset: &str, is_emulator: bool) -> String {
if is_emulator {
offset.to_string()
} else {
// Here we reconstruct an Azure marker (continuation token) from a key to be able to seek
// into an arbitrary position in the key space.
// The current format (July 2024) for the marker is as follows:
//
// +-> current token version
// |
// | +-> unpadded length of base64 encoded field
// | |
// | | +-> base64 encoded field with characters (/,+,=) repaced with (_,*,-)
// | | |
// 2!72!MDAwMDA4IWZpbGUudHh0ITAwMDAyOCE5OTk5LTEyLTMxVDIzOjU5OjU5Ljk5OTk5OTlaIQ--
// | | ^
// terminators |
// |
// +------------+
// Decoding the |base64 field| gives:
// +------------+
//
// +-> length of key field padded to 6 digits
// |
// | +-> key to start listing at
// | |
// | | +-> length of timestamp field padded to 6 digits
// | | |
// | | | +-> constant max timestamp field
// | | | |
// 000008!file.txt!000028!9999-12-31T23:59:59.9999999Z!
// | | | |
// +----> field terminators <-------------------+
//
// When recostructing we add a space character (ASCII 0x20) to the end of the key to change the
// `start_at` behavior into a `start_after` behavior as the space character is the first valid character
// in the lexicographical order.
//
// It appears that hadoop relies on this, code here:
// https://github.com/apache/hadoop/blob/059e996c02d64716707d8dfb905dc84bab317aef/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java#L1358

let encoded_part = BASE64_STANDARD
.encode(&format!(
"{:06}!{} !000028!9999-12-31T23:59:59.9999999Z!",
offset.len() + 1,
offset,
))
.replace("/", "_")
.replace("+", "*")
.replace("=", "-");
format!("2!{}!{}", encoded_part.len(), encoded_part)
}
}

#[async_trait]
impl ListClient for AzureClient {
/// Make an Azure List request <https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs>
Expand All @@ -569,6 +624,7 @@ impl ListClient for AzureClient {
token: Option<&str>,
offset: Option<&str>,
) -> Result<(ListResult, Option<String>)> {
#[cfg(not(feature = "experimental-azure-list-offset"))]
assert!(offset.is_none()); // Not yet supported

let credential = self.get_credential().await?;
Expand All @@ -586,6 +642,16 @@ impl ListClient for AzureClient {
query.push(("delimiter", DELIMITER))
}

#[cfg(feature = "experimental-azure-list-offset")]
let token_string = match (token, offset) {
(Some(token), _) => Some(token.to_string()),
(None, Some(offset)) => Some(marker_for_offset(offset, self.config.is_emulator)),
(None, None) => None,
};

#[cfg(feature = "experimental-azure-list-offset")]
let token = token_string.as_deref();

if let Some(token) = token {
query.push(("marker", token))
}
Expand Down Expand Up @@ -967,4 +1033,19 @@ mod tests {
let _delegated_key_response_internal: UserDelegationKey =
quick_xml::de::from_str(S).unwrap();
}

#[cfg(feature = "experimental-azure-list-offset")]
#[test]
fn test_marker_for_offset() {
// BlobStorage
let marker = marker_for_offset("file.txt", false);
assert_eq!(
marker,
"2!72!MDAwMDA5IWZpbGUudHh0ICEwMDAwMjghOTk5OS0xMi0zMVQyMzo1OTo1OS45OTk5OTk5WiE-"
);

// Azurite
let marker = marker_for_offset("file.txt", true);
assert_eq!(marker, "file.txt");
}
}
9 changes: 9 additions & 0 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ impl ObjectStore for MicrosoftAzure {
self.client.list(prefix)
}

#[cfg(feature = "experimental-azure-list-offset")]
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
self.client.list_with_offset(prefix, offset)
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
self.client.list_with_delimiter(prefix).await
}
Expand Down
Loading