diff --git a/src/FSharp.AWS.DynamoDB/TableContext.fs b/src/FSharp.AWS.DynamoDB/TableContext.fs index f2edd02..7bbb783 100644 --- a/src/FSharp.AWS.DynamoDB/TableContext.fs +++ b/src/FSharp.AWS.DynamoDB/TableContext.fs @@ -230,15 +230,6 @@ type private LimitType = static member AllOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue All static member DefaultOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue Default -/// Helpers for working with TransactWriteItemsRequest -module TransactWriteItemsRequest = - /// Exception filter to identify whether a TransactWriteItems call has failed due to - /// one or more of the supplied precondition checks failing. - let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option = - function - | :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some() - | _ -> None - /// Helpers for identifying Failed Precondition check outcomes emanating from PutItem, UpdateItem or DeleteItem module Precondition = /// Exception filter to identify whether an individual (non-transactional) PutItem, UpdateItem or DeleteItem call's precondition check failing. @@ -247,39 +238,6 @@ module Precondition = | :? ConditionalCheckFailedException -> Some() | _ -> None -type TransactWriter<'TRecord> internal (tableName: string, template: RecordTemplate<'TRecord>) = - member _.Put(item: 'TRecord, precondition: option>) : TransactWriteItem = - let req = Put(TableName = tableName, Item = template.ToAttributeValues item) - precondition - |> Option.iter (fun cond -> - let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) - req.ConditionExpression <- cond.Conditional.Write writer) - TransactWriteItem(Put = req) - member _.Check(key: TableKey, condition: ConditionExpression<'TRecord>) : TransactWriteItem = - let req = ConditionCheck(TableName = tableName, Key = template.ToAttributeValues key) - let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) - req.ConditionExpression <- condition.Conditional.Write writer - TransactWriteItem(ConditionCheck = req) - member _.Update - ( - key: TableKey, - precondition: option>, - updater: UpdateExpression<'TRecord> - - ) : TransactWriteItem = - let req = Update(TableName = tableName, Key = template.ToAttributeValues key) - let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) - req.UpdateExpression <- updater.UpdateOps.Write(writer) - precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer) - TransactWriteItem(Update = req) - member _.Delete(key: TableKey, precondition: option>) : TransactWriteItem = - let req = Delete(TableName = tableName, Key = template.ToAttributeValues key) - precondition - |> Option.iter (fun cond -> - let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) - req.ConditionExpression <- cond.Conditional.Write writer) - TransactWriteItem(Delete = req) - /// DynamoDB client object for performing table operations in the context of given F# record representations [] @@ -532,9 +490,6 @@ type TableContext<'TRecord> member _.LocalSecondaryIndices = template.LocalSecondaryIndices /// Record-induced table template member _.Template = template - /// Represents an individual request that can be included in the TransactItems of a TransactWriteItems call. - member _.TransactWrite = TransactWriter<'TRecord>(tableName, template) - /// /// Creates a DynamoDB client instance for given F# record and table name.
@@ -886,30 +841,6 @@ type TableContext<'TRecord> return unprocessed |> Array.map template.ExtractKey } - - /// - /// Atomically applies a set of 1-100 write operations to the table.
- /// NOTE requests are charged at twice the normal rate in Write Capacity Units. - /// See the DynamoDB TransactWriteItems API documentation for full details of semantics and charges.
- ///
- /// Operations to be performed.
- /// Throws ArgumentOutOfRangeException if item count is not between 1 and 100 as required by underlying API.
- /// Use TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed to identify any Precondition Check failures. - /// The ClientRequestToken to supply as an idempotency key (10 minute window). - member _.TransactWriteItems(items: seq, ?clientRequestToken) : Async = async { - if (Seq.length items) = 0 || (Seq.length items) > 100 then - raise <| System.ArgumentOutOfRangeException(nameof items, "must be between 1 and 100 items.") - let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray items)) - clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x) - let! ct = Async.CancellationToken - let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect - maybeReport - |> Option.iter (fun r -> r TransactWriteItems (Seq.toList response.ConsumedCapacity) (Seq.length items)) - if response.HttpStatusCode <> HttpStatusCode.OK then - failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode - } - - /// /// Asynchronously queries table with given condition expressions. /// @@ -2081,3 +2012,115 @@ module Scripting = member t.UpdateProvisionedThroughput(provisionedThroughput: ProvisionedThroughput) : unit = let spec = Throughput.Provisioned provisionedThroughput t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously + +/// Helpers for working with TransactWriteItemsRequest +module TransactWriteItemsRequest = + /// Exception filter to identify whether a TransactWriteItems call has failed due to + /// one or more of the supplied precondition checks failing. + let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option = + function + | :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some() + | _ -> None + + +type TransactionBuilder(?metricsCollector: (RequestMetrics -> unit)) = + let transactionItems = ResizeArray() + let mutable (dynamoDbClient: IAmazonDynamoDB) = null + + let setClient client = + if dynamoDbClient = null then + System.Console.WriteLine("WAT") + dynamoDbClient <- client + + let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) = + collector + { TableName = tableName + Operation = operation + ConsumedCapacity = consumedCapacity + ItemCount = itemCount } + + let returnConsumedCapacity, maybeReport = + match metricsCollector with + | Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink) + | None -> ReturnConsumedCapacity.NONE, None + + member this.Put<'TRecord> + ( + tableContext: TableContext<'TRecord>, + item: 'TRecord, + ?precondition: ConditionExpression<'TRecord> + ) : TransactionBuilder = + setClient tableContext.Client + let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item) + precondition + |> Option.iter (fun cond -> + let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) + req.ConditionExpression <- cond.Conditional.Write writer) + transactionItems.Add(TransactWriteItem(Put = req)) + this + member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : TransactionBuilder = + setClient tableContext.Client + + let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key) + let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) + req.ConditionExpression <- condition.Conditional.Write writer + transactionItems.Add(TransactWriteItem(ConditionCheck = req)) + this + member this.Update + ( + tableContext: TableContext<'TRecord>, + key: TableKey, + updater: UpdateExpression<'TRecord>, + ?precondition: ConditionExpression<'TRecord> + + ) : TransactionBuilder = + setClient tableContext.Client + + let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key) + let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) + req.UpdateExpression <- updater.UpdateOps.Write(writer) + precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer) + transactionItems.Add(TransactWriteItem(Update = req)) + this + member this.Delete + ( + tableContext: TableContext<'TRecord>, + key: TableKey, + precondition: option> + ) : TransactionBuilder = + setClient tableContext.Client + + let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key) + precondition + |> Option.iter (fun cond -> + let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues) + req.ConditionExpression <- cond.Conditional.Write writer) + transactionItems.Add(TransactWriteItem(Delete = req)) + this + + /// + /// Atomically applies a set of 1-100 write operations to the table.
+ /// NOTE requests are charged at twice the normal rate in Write Capacity Units. + /// See the DynamoDB TransactWriteItems API documentation for full details of semantics and charges.
+ ///
+ /// Operations to be performed.
+ /// Throws ArgumentOutOfRangeException if item count is not between 1 and 100 as required by underlying API.
+ /// Use TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed to identify any Precondition Check failures. + /// The ClientRequestToken to supply as an idempotency key (10 minute window). + member _.TransactWriteItems(?clientRequestToken) : Async = async { + if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then + raise + <| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.") + let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems)) + clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x) + let! ct = Async.CancellationToken + let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect + maybeReport + |> Option.iter (fun r -> + response.ConsumedCapacity + |> Seq.groupBy (fun x -> x.TableName) + |> Seq.iter (fun (tableName, consumedCapacity) -> + r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems))) + if response.HttpStatusCode <> HttpStatusCode.OK then + failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode + } diff --git a/tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs b/tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs index e638a07..aedb110 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs @@ -106,9 +106,10 @@ type Tests(fixture: TableFixture) = collector.Clear() let item = mkItem (guid ()) (guid ()) 0 - let requests = [ sut.TransactWrite.Put(item, Some(compile <@ fun t -> NOT_EXISTS t.RangeKey @>)) ] - - do! sut.TransactWriteItems requests + do! + TransactionBuilder(collector.Collect) + .Put(sut, item, compile <@ fun t -> NOT_EXISTS t.RangeKey @>) + .TransactWriteItems() test <@ @@ -130,13 +131,13 @@ type Tests(fixture: TableFixture) = let sut = rawTable.WithMetricsCollector(collector.Collect) let item = mkItem (guid ()) (guid ()) 0 - - // The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case - let requests = [ sut.TransactWrite.Put(item, Some(compile <@ fun t -> EXISTS t.RangeKey @>)) ] - let mutable failed = false try - do! sut.TransactWriteItems requests + do! + // The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case + TransactionBuilder() + .Put(sut, item, compile <@ fun t -> EXISTS t.RangeKey @>) + .TransactWriteItems() with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> failed <- true true =! failed diff --git a/tests/FSharp.AWS.DynamoDB.Tests/SimpleTableOperationTests.fs b/tests/FSharp.AWS.DynamoDB.Tests/SimpleTableOperationTests.fs index b9e2ea0..152c0ba 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/SimpleTableOperationTests.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/SimpleTableOperationTests.fs @@ -161,10 +161,10 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = [] let ``Minimal happy path`` () = async { let item = mkItem () - - let requests = [ table1.TransactWrite.Put(item, Some doesntExistConditionTable1) ] - - do! table1.TransactWriteItems requests + do! + TransactionBuilder() + .Put(table1, item, doesntExistConditionTable1) + .TransactWriteItems() let! itemFound = table1.ContainsKeyAsync(table1.Template.ExtractKey item) true =! itemFound @@ -175,12 +175,11 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = let item = mkItem () let compatibleItem = mkCompatibleItem () - let tableRequest = table1.TransactWrite.Put(item, Some doesntExistConditionTable1) - let table2Request = table2.TransactWrite.Put(compatibleItem, None) - - let requests = [ tableRequest; table2Request ] - - do! table1.TransactWriteItems requests + do! + TransactionBuilder() + .Put(table1, item, doesntExistConditionTable1) + .Put(table2, compatibleItem, doesntExistConditionTable2) + .TransactWriteItems() let! itemFound = table1.ContainsKeyAsync(table1.Template.ExtractKey item) true =! itemFound @@ -193,11 +192,12 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = let ``Minimal Canceled path`` () = async { let item = mkItem () - let requests = [ table1.TransactWrite.Put(item, Some existsConditionTable1) ] - let mutable failed = false try - do! table1.TransactWriteItems requests + do! + TransactionBuilder() + .Put(table1, item, existsConditionTable1) + .TransactWriteItems() with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> failed <- true @@ -212,15 +212,17 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = let item, item2 = mkItem (), mkItem () let! key = table1.PutItemAsync item - let requests = - [ if shouldFail then - table1.TransactWrite.Check(key, doesntExistConditionTable1) - else - table1.TransactWrite.Check(key, existsConditionTable1) - table1.TransactWrite.Put(item2, None) ] + let transaction = + if shouldFail then + TransactionBuilder().Check(table1, key, doesntExistConditionTable1) + else + TransactionBuilder() + .Check(table1, key, existsConditionTable1) + .Put(table1, item2) + let mutable failed = false try - do! table1.TransactWriteItems requests + do! transaction.TransactWriteItems() with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> failed <- true @@ -234,28 +236,30 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = let ``All paths`` shouldFail = async { let item, item2, item3, item4, item5, item6, item7 = mkItem (), mkItem (), mkItem (), mkItem (), mkItem (), mkItem (), mkItem () let! key = table1.PutItemAsync item + let transactionBuilder = TransactionBuilder() let requests = - [ table1.TransactWrite.Update(key, Some existsConditionTable1, compileUpdateTable1 <@ fun t -> { t with Value = 42 } @>) - table1.TransactWrite.Put(item2, None) - table1.TransactWrite.Put(item3, Some doesntExistConditionTable1) - table1.TransactWrite.Delete(table1.Template.ExtractKey item4, Some doesntExistConditionTable1) - table1.TransactWrite.Delete(table1.Template.ExtractKey item5, None) - table1.TransactWrite.Check( + [ transactionBuilder.Update(table1, key, compileUpdateTable1 <@ fun t -> { t with Value = 42 } @>, existsConditionTable1) + transactionBuilder.Put(table1, item2) + transactionBuilder.Put(table1, item3, doesntExistConditionTable1) + transactionBuilder.Delete(table1, table1.Template.ExtractKey item4, Some doesntExistConditionTable1) + transactionBuilder.Delete(table1, table1.Template.ExtractKey item5, None) + transactionBuilder.Check( + table1, table1.Template.ExtractKey item6, (if shouldFail then existsConditionTable1 else doesntExistConditionTable1) ) - table1.TransactWrite.Update( + transactionBuilder.Update( + table1, TableKey.Combined(item7.HashKey, item7.RangeKey), - None, compileUpdateTable1 <@ fun t -> { t with Tuple = (42, 42) } @> ) ] let mutable failed = false try - do! table1.TransactWriteItems requests + do! transactionBuilder.TransactWriteItems() with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> failed <- true failed =! shouldFail @@ -273,8 +277,8 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = test <@ shouldFail <> (maybeItem7 |> Option.map (fun x -> x.Tuple) |> Option.contains (42, 42)) @> } - let shouldBeRejectedWithArgumentOutOfRangeException requests = async { - let! e = Async.Catch(table1.TransactWriteItems requests) + let shouldBeRejectedWithArgumentOutOfRangeException (builder: TransactionBuilder) = async { + let! e = Async.Catch(builder.TransactWriteItems()) test <@ match e with @@ -284,10 +288,17 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) = } [] - let ``Empty request list is rejected with AORE`` () = shouldBeRejectedWithArgumentOutOfRangeException [] + let ``Empty request list is rejected with AORE`` () = + shouldBeRejectedWithArgumentOutOfRangeException (TransactionBuilder()) + |> Async.RunSynchronously + |> ignore [] let ``Over 100 writes are rejected with AORE`` () = - shouldBeRejectedWithArgumentOutOfRangeException [ for _x in 1..101 -> table1.TransactWrite.Put(mkItem (), None) ] + let transactionBuilder = TransactionBuilder() + for _x in 1..101 do + transactionBuilder.Put(table1, mkItem ()) |> ignore + + shouldBeRejectedWithArgumentOutOfRangeException transactionBuilder interface IClassFixture