From cf61bb89d8318a972b1408771ca1d82baf03e1b0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 4 Jan 2024 21:32:26 +0000 Subject: [PATCH] DynamoDB ConditionalPut (#5247) * Parse Dynamo CondititionalPut * Add etag sort key * Conditional Put * Speedup repeated test runs * Clippy --- .github/workflows/object_store.yml | 3 +- object_store/src/aws/dynamo.rs | 155 +++++++++++++++++++++------ object_store/src/aws/mod.rs | 18 +++- object_store/src/aws/precondition.rs | 46 ++++++-- object_store/src/lib.rs | 12 ++- 5 files changed, 187 insertions(+), 47 deletions(-) diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index 313d158090ae..0257d86d9879 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -113,6 +113,7 @@ jobs: AWS_ENDPOINT: http://localhost:4566 AWS_ALLOW_HTTP: true AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000 + AWS_CONDITIONAL_PUT: dynamo:test-table:2000 HTTP_URL: "http://localhost:8080" GOOGLE_BUCKET: test-bucket GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json" @@ -137,7 +138,7 @@ jobs: docker run -d -p 4566:4566 localstack/localstack:3.0.1 docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2 aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket - aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 + aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 - name: Configure Azurite (Azure emulation) # the magical connection string is from diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs index ce1500bf4090..f12a42137856 100644 --- a/object_store/src/aws/dynamo.rs +++ b/object_store/src/aws/dynamo.rs @@ -17,7 +17,9 @@ //! A DynamoDB based lock system +use std::borrow::Cow; use std::collections::HashMap; +use std::future::Future; use std::time::{Duration, Instant}; use chrono::Utc; @@ -61,16 +63,24 @@ const STORE: &str = "DynamoDB"; /// /// The DynamoDB schema is as follows: /// -/// * A string hash key named `"key"` +/// * A string partition key named `"path"` +/// * A string sort key named `"etag"` /// * A numeric [TTL] attribute named `"ttl"` /// * A numeric attribute named `"generation"` /// * A numeric attribute named `"timeout"` /// -/// To perform a conditional operation on an object with a given `path` and `etag` (if exists), +/// An appropriate DynamoDB table can be created with the CLI as follows: +/// +/// ```bash +/// $ aws dynamodb create-table --table-name --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S +/// $ aws dynamodb update-time-to-live --table-name --time-to-live-specification Enabled=true,AttributeName=ttl +/// ``` +/// +/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating), /// the commit protocol is as follows: /// /// 1. Perform HEAD request on `path` and error on precondition mismatch -/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout +/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout /// 1. On Success: Perform operation with the configured timeout /// 2. On Conflict: /// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch @@ -154,6 +164,16 @@ impl DynamoCommit { self } + /// Parse [`DynamoCommit`] from a string + pub(crate) fn from_str(value: &str) -> Option { + Some(match value.split_once(':') { + Some((table_name, timeout)) => { + Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?) + } + None => Self::new(value.trim().to_string()), + }) + } + /// Returns the name of the DynamoDB table. pub(crate) fn table_name(&self) -> &str { &self.table_name @@ -165,23 +185,41 @@ impl DynamoCommit { from: &Path, to: &Path, ) -> Result<()> { - check_not_exists(client, to).await?; + self.conditional_op(client, to, None, || async { + client.copy_request(from, to).send().await?; + Ok(()) + }) + .await + } + + #[allow(clippy::future_not_send)] // Generics confound this lint + pub(crate) async fn conditional_op( + &self, + client: &S3Client, + to: &Path, + etag: Option<&str>, + op: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + check_precondition(client, to, etag).await?; let mut previous_lease = None; loop { let existing = previous_lease.as_ref(); - match self.try_lock(client, to.as_ref(), existing).await? { + match self.try_lock(client, to.as_ref(), etag, existing).await? { TryLockResult::Ok(lease) => { - let fut = client.copy_request(from, to).send(); let expiry = lease.acquire + lease.timeout; - return match tokio::time::timeout_at(expiry.into(), fut).await { - Ok(Ok(_)) => Ok(()), - Ok(Err(e)) => Err(e.into()), + return match tokio::time::timeout_at(expiry.into(), op()).await { + Ok(Ok(v)) => Ok(v), + Ok(Err(e)) => Err(e), Err(_) => Err(Error::Generic { store: "DynamoDB", source: format!( - "Failed to perform copy operation in {} milliseconds", + "Failed to perform conditional operation in {} milliseconds", self.timeout ) .into(), @@ -193,7 +231,7 @@ impl DynamoCommit { let expiry = conflict.timeout * self.max_clock_skew_rate; loop { interval.tick().await; - check_not_exists(client, to).await?; + check_precondition(client, to, etag).await?; if conflict.acquire.elapsed() > expiry { previous_lease = Some(conflict); break; @@ -205,8 +243,11 @@ impl DynamoCommit { } /// Retrieve a lock, returning an error if it doesn't exist - async fn get_lock(&self, s3: &S3Client, key: &str) -> Result { - let key_attributes = [("key", AttributeValue::String(key))]; + async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result { + let key_attributes = [ + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), + ]; let req = GetItem { table_name: &self.table_name, key: Map(&key_attributes), @@ -216,7 +257,7 @@ impl DynamoCommit { let resp = self .request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req) .await - .map_err(|e| e.error(STORE, key.to_string()))?; + .map_err(|e| e.error(STORE, path.to_string()))?; let body = resp.bytes().await.map_err(|e| Error::Generic { store: STORE, @@ -230,7 +271,7 @@ impl DynamoCommit { })?; extract_lease(&response.item).ok_or_else(|| Error::NotFound { - path: key.into(), + path: path.into(), source: "DynamoDB GetItem returned no items".to_string().into(), }) } @@ -239,7 +280,8 @@ impl DynamoCommit { async fn try_lock( &self, s3: &S3Client, - key: &str, + path: &str, + etag: Option<&str>, existing: Option<&Lease>, ) -> Result { let attributes; @@ -257,12 +299,13 @@ impl DynamoCommit { let ttl = (Utc::now() + self.ttl).timestamp(); let items = [ - ("key", AttributeValue::String(key)), + ("path", AttributeValue::from(path)), + ("etag", AttributeValue::from(etag.unwrap_or("*"))), ("generation", AttributeValue::Number(next_gen)), ("timeout", AttributeValue::Number(self.timeout)), ("ttl", AttributeValue::Number(ttl as _)), ]; - let names = [("#pk", "key")]; + let names = [("#pk", "path")]; let req = PutItem { table_name: &self.table_name, @@ -302,7 +345,9 @@ impl DynamoCommit { // // // - None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)), + None => Ok(TryLockResult::Conflict( + self.get_lock(s3, path, etag).await?, + )), }, _ => Err(Error::Generic { store: STORE, @@ -347,19 +392,37 @@ enum TryLockResult { Conflict(Lease), } -/// Returns an [`Error::AlreadyExists`] if `path` exists -async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> { +/// Validates that `path` has the given `etag` or doesn't exist if `None` +async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> { let options = GetOptions { head: true, ..Default::default() }; - match client.get_opts(path, options).await { - Ok(_) => Err(Error::AlreadyExists { - path: path.to_string(), - source: "Already Exists".to_string().into(), - }), - Err(Error::NotFound { .. }) => Ok(()), - Err(e) => Err(e), + + match etag { + Some(expected) => match client.get_opts(path, options).await { + Ok(r) => match r.meta.e_tag { + Some(actual) if expected == actual => Ok(()), + actual => Err(Error::Precondition { + path: path.to_string(), + source: format!("{} does not match {expected}", actual.unwrap_or_default()) + .into(), + }), + }, + Err(Error::NotFound { .. }) => Err(Error::Precondition { + path: path.to_string(), + source: format!("Object at location {path} not found").into(), + }), + Err(e) => Err(e), + }, + None => match client.get_opts(path, options).await { + Ok(_) => Err(Error::AlreadyExists { + path: path.to_string(), + source: "Already Exists".to_string().into(), + }), + Err(Error::NotFound { .. }) => Ok(()), + Err(e) => Err(e), + }, } } @@ -493,11 +556,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> { #[derive(Debug, Serialize, Deserialize)] enum AttributeValue<'a> { #[serde(rename = "S")] - String(&'a str), + String(Cow<'a, str>), #[serde(rename = "N", with = "number")] Number(u64), } +impl<'a> From<&'a str> for AttributeValue<'a> { + fn from(value: &'a str) -> Self { + Self::String(Cow::Borrowed(value)) + } +} + /// Numbers are serialized as strings mod number { use serde::{Deserialize, Deserializer, Serializer}; @@ -518,10 +587,11 @@ pub(crate) use tests::integration_test; #[cfg(test)] mod tests { - use super::*; use crate::aws::AmazonS3; use crate::ObjectStore; + use rand::distributions::Alphanumeric; + use rand::{thread_rng, Rng}; #[test] fn test_attribute_serde() { @@ -544,24 +614,43 @@ mod tests { let _ = integration.delete(&dst).await; // Delete if present // Create a lock if not already exists - let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { TryLockResult::Conflict(l) => l, TryLockResult::Ok(l) => l, }; // Should not be able to acquire a lock again - let r = d.try_lock(client, dst.as_ref(), None).await; + let r = d.try_lock(client, dst.as_ref(), None, None).await; assert!(matches!(r, Ok(TryLockResult::Conflict(_)))); // But should still be able to reclaim lock and perform copy d.copy_if_not_exists(client, &src, &dst).await.unwrap(); - match d.try_lock(client, dst.as_ref(), None).await.unwrap() { + match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() { TryLockResult::Conflict(new) => { // Should have incremented generation to do so assert_eq!(new.generation, existing.generation + 1); } _ => panic!("Should conflict"), } + + let rng = thread_rng(); + let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + let t = Some(etag.as_str()); + + let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Ok(l) => l, + _ => panic!("should not conflict"), + }; + + match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() { + TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation), + _ => panic!("should conflict"), + } + + match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() { + TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1), + _ => panic!("should not conflict"), + } } } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 20e7b032ab35..d167c78e4c8c 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -187,12 +187,26 @@ impl ObjectStore for AmazonS3 { r => r, } } - (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => { + (PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => { + d.conditional_op(&self.client, location, None, move || request.do_put()) + .await + } + (PutMode::Update(v), Some(put)) => { let etag = v.e_tag.ok_or_else(|| Error::Generic { store: STORE, source: "ETag required for conditional put".to_string().into(), })?; - request.header(&IF_MATCH, etag.as_str()).do_put().await + match put { + S3ConditionalPut::ETagMatch => { + request.header(&IF_MATCH, etag.as_str()).do_put().await + } + S3ConditionalPut::Dynamo(d) => { + d.conditional_op(&self.client, location, Some(&etag), move || { + request.do_put() + }) + .await + } + } } } } diff --git a/object_store/src/aws/precondition.rs b/object_store/src/aws/precondition.rs index 83d45db82c8e..ad9e21537939 100644 --- a/object_store/src/aws/precondition.rs +++ b/object_store/src/aws/precondition.rs @@ -48,7 +48,7 @@ pub enum S3CopyIfNotExists { HeaderWithStatus(String, String, reqwest::StatusCode), /// The name of a DynamoDB table to use for coordination /// - /// Encoded as either `dynamodb:` or `dynamodb::` + /// Encoded as either `dynamo:` or `dynamo::` /// ignoring whitespace. The default timeout is used if not specified /// /// See [`DynamoCommit`] for more information @@ -88,12 +88,7 @@ impl S3CopyIfNotExists { code, )) } - "dynamo" => Some(Self::Dynamo(match value.split_once(':') { - Some((table_name, timeout)) => DynamoCommit::new(table_name.trim().to_string()) - .with_timeout(timeout.parse().ok()?), - None => DynamoCommit::new(value.trim().to_string()), - })), - + "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)), _ => None, } } @@ -111,7 +106,7 @@ impl Parse for S3CopyIfNotExists { /// Configure how to provide conditional put support for [`AmazonS3`]. /// /// [`AmazonS3`]: super::AmazonS3 -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Eq, PartialEq)] #[allow(missing_copy_implementations)] #[non_exhaustive] pub enum S3ConditionalPut { @@ -122,12 +117,23 @@ pub enum S3ConditionalPut { /// /// [HTTP precondition]: https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions ETagMatch, + + /// The name of a DynamoDB table to use for coordination + /// + /// Encoded as either `dynamo:` or `dynamo::` + /// ignoring whitespace. The default timeout is used if not specified + /// + /// See [`DynamoCommit`] for more information + /// + /// This will use the same region, credentials and endpoint as configured for S3 + Dynamo(DynamoCommit), } impl std::fmt::Display for S3ConditionalPut { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::ETagMatch => write!(f, "etag"), + Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()), } } } @@ -136,7 +142,10 @@ impl S3ConditionalPut { fn from_str(s: &str) -> Option { match s.trim() { "etag" => Some(Self::ETagMatch), - _ => None, + trimmed => match trimmed.split_once(':')? { + ("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)), + _ => None, + }, } } } @@ -153,6 +162,7 @@ impl Parse for S3ConditionalPut { #[cfg(test)] mod tests { use super::S3CopyIfNotExists; + use crate::aws::{DynamoCommit, S3ConditionalPut}; #[test] fn parse_s3_copy_if_not_exists_header() { @@ -177,6 +187,24 @@ mod tests { assert_eq!(expected, S3CopyIfNotExists::from_str(input)); } + #[test] + fn parse_s3_copy_if_not_exists_dynamo() { + let input = "dynamo: table:100"; + let expected = Some(S3CopyIfNotExists::Dynamo( + DynamoCommit::new("table".into()).with_timeout(100), + )); + assert_eq!(expected, S3CopyIfNotExists::from_str(input)); + } + + #[test] + fn parse_s3_condition_put_dynamo() { + let input = "dynamo: table:1300"; + let expected = Some(S3ConditionalPut::Dynamo( + DynamoCommit::new("table".into()).with_timeout(1300), + )); + assert_eq!(expected, S3ConditionalPut::from_str(input)); + } + #[test] fn parse_s3_copy_if_not_exists_header_whitespace_invariant() { let expected = Some(S3CopyIfNotExists::Header( diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index ab462cc15607..8fc47b2c5de7 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1233,6 +1233,7 @@ mod tests { use crate::test_util::flatten_list_stream; use chrono::TimeZone; use futures::stream::FuturesUnordered; + use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::future::Future; use tokio::io::AsyncWriteExt; @@ -1726,8 +1727,15 @@ mod tests { } pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { + // When using DynamoCommit repeated runs of this test will produce the same sequence of records in DynamoDB + // As a result each conditional operation will need to wait for the lease to timeout before proceeding + // One solution would be to clear DynamoDB before each test, but this would require non-trivial additional code + // so we instead just generate a random suffix for the filenames + let rng = thread_rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + delete_fixtures(storage).await; - let path = Path::from("put_opts"); + let path = Path::from(format!("put_opts_{suffix}")); let v1 = storage .put_opts(&path, "a".into(), PutMode::Create.into()) .await @@ -1779,7 +1787,7 @@ mod tests { const NUM_WORKERS: usize = 5; const NUM_INCREMENTS: usize = 10; - let path = Path::from("RACE"); + let path = Path::from(format!("RACE-{suffix}")); let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) .map(|_| async { for _ in 0..NUM_INCREMENTS {