Skip to content

Commit

Permalink
Code review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
purkhusid committed Oct 15, 2024
1 parent 91c4377 commit 1fdc601
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 126 deletions.
25 changes: 11 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,9 @@ table.Scan(startedBefore (DateTimeOffset.Now - TimeSpan.FromDays 1.))

(See [`Script.fsx`](src/FSharp.AWS.DynamoDB/Script.fsx) for example timings showing the relative efficiency.)

## `TransactWriteItems`
## `Transaction`

Using [`TransactWriteItems`](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html)
to compose multiple write operations into an aggregate request that will succeed or fail atomically is supported.
See [overview article](https://www.alexdebrie.com/posts/dynamodb-transactions) by [@alexdebrie](https://github.com/alexdebrie)

NOTE: while the underlying API supports combining operations on multiple tables, the exposed API does not.
`FSharp.AWS.DynamoDB` supports DynamoDB transactions via the `Transaction` class.

The supported individual operations are:
- `Check`: `ConditionCheck` - potentially veto the batch if the ([precompiled](#Precomputing-DynamoDB-Expressions)) `condition` is not fulfilled by the item identified by `key`
Expand All @@ -271,21 +267,22 @@ The supported individual operations are:
let compile = table.Template.PrecomputeConditionalExpr
let doesntExistCondition = compile <@ fun t -> NOT_EXISTS t.Value @>
let existsCondition = compile <@ fun t -> EXISTS t.Value @>
let key = TableKey.Combined(hashKey, rangeKey)
let requests = [
TransactWrite.Check (key, doesntExistCondition)
TransactWrite.Put (item2, None)
TransactWrite.Put (item3, Some existsCondition)
TransactWrite.Delete (table.Template.ExtractKey item5, None) ]
do! table.TransactWriteItems requests
let transaction = Transaction()
transaction.Check(table, key, doesntExistCondition)
transaction.Put(table, item2, None)
transaction.Put(table, item3, Some existsCondition)
transaction.Delete (table ,table.Template.ExtractKey item5, None)
do! transaction.TransactWriteItems()
```

Failed preconditions (or `TransactWrite.Check`s) are signalled as per the underlying API: via a `TransactionCanceledException`.
Use `TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed` to trap such conditions:

```fsharp
try do! table.TransactWriteItems writes
try do! transaction.TransactWriteItems()
return Some result
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> return None
```
Expand Down
240 changes: 137 additions & 103 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,141 @@ type TableContext<'TRecord>
else
t.VerifyTableAsync()

member t.Transaction() =
match metricsCollector with
| Some metricsCollector -> Transaction(metricsCollector = metricsCollector)
| None -> Transaction()

/// <summary>
/// Represents a transactional set of operations to be applied atomically to a arbitrary number of DynamoDB tables.
/// </summary>
/// <param name="metricsCollector">Function to receive request metrics.</param>
and Transaction(?metricsCollector: (RequestMetrics -> unit)) =
let transactionItems = ResizeArray<TransactWriteItem>()
let mutable (dynamoDbClient: IAmazonDynamoDB) = null

let setClient client =
if dynamoDbClient = null then
dynamoDbClient <- client

let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) =
collector
{ TableName = tableName
Operation = operation
ConsumedCapacity = consumedCapacity
ItemCount = itemCount }

let returnConsumedCapacity, maybeReport =
match metricsCollector with
| Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink)
| None -> ReturnConsumedCapacity.NONE, None

/// <summary>
/// Adds a Put operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="item">Item to be put.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Put<'TRecord>
(
tableContext: TableContext<'TRecord>,
item: 'TRecord,
?precondition: ConditionExpression<'TRecord>
) : Transaction =
setClient tableContext.Client
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Put = req))
this

/// <summary>
/// Adds a ConditionCheck operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to check.</param>
/// <param name="condition">Condition to check.</param>
member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : Transaction =
setClient tableContext.Client

let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- condition.Conditional.Write writer
transactionItems.Add(TransactWriteItem(ConditionCheck = req))
this

/// <summary>
/// Adds an Update operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to update.</param>
/// <param name="updater">Update expression.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Update
(
tableContext: TableContext<'TRecord>,
key: TableKey,
updater: UpdateExpression<'TRecord>,
?precondition: ConditionExpression<'TRecord>

) : Transaction =
setClient tableContext.Client

let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Update = req))
this

/// <summary>
/// Adds a Delete operation to the transaction.
/// </summary>
/// <param name="tableContext">Table context to operate on.</param>
/// <param name="key">Key of item to delete.</param>
/// <param name="precondition">Optional precondition expression.</param>
member this.Delete
(
tableContext: TableContext<'TRecord>,
key: TableKey,
precondition: option<ConditionExpression<'TRecord>>
) : Transaction =
setClient tableContext.Client

let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Delete = req))
this

/// <summary>
/// Atomically applies a set of 1-100 operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(?clientRequestToken) : Async<unit> = async {
if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then
raise
<| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems))
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r ->
response.ConsumedCapacity
|> Seq.groupBy (fun x -> x.TableName)
|> Seq.iter (fun (tableName, consumedCapacity) ->
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}

// Deprecated factory method, to be removed. Replaced with
// 1. TableContext<'T> ctor (synchronous)
// 2. VerifyOrCreateTableAsync OR VerifyTableAsync (explicitly async to signify that verification/creation is a costly and/or privileged operation)
Expand Down Expand Up @@ -1405,6 +1540,8 @@ type TableContext internal () =
)
|> Async.RunSynchronously



/// <summary>
/// Sync-over-Async helpers that can be opted-into when working in scripting scenarios.
/// For normal usage, the <c>Async</c> versions of any given API is recommended, in order to ensure one
Expand Down Expand Up @@ -2021,106 +2158,3 @@ module TransactWriteItemsRequest =
function
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
| _ -> None


type Transaction(?metricsCollector: (RequestMetrics -> unit)) =
let transactionItems = ResizeArray<TransactWriteItem>()
let mutable (dynamoDbClient: IAmazonDynamoDB) = null

let setClient client =
if dynamoDbClient = null then
System.Console.WriteLine("WAT")
dynamoDbClient <- client

let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) =
collector
{ TableName = tableName
Operation = operation
ConsumedCapacity = consumedCapacity
ItemCount = itemCount }

let returnConsumedCapacity, maybeReport =
match metricsCollector with
| Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink)
| None -> ReturnConsumedCapacity.NONE, None

member this.Put<'TRecord>
(
tableContext: TableContext<'TRecord>,
item: 'TRecord,
?precondition: ConditionExpression<'TRecord>
) : Transaction =
setClient tableContext.Client
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Put = req))
this
member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : Transaction =
setClient tableContext.Client

let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- condition.Conditional.Write writer
transactionItems.Add(TransactWriteItem(ConditionCheck = req))
this
member this.Update
(
tableContext: TableContext<'TRecord>,
key: TableKey,
updater: UpdateExpression<'TRecord>,
?precondition: ConditionExpression<'TRecord>

) : Transaction =
setClient tableContext.Client

let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Update = req))
this
member this.Delete
(
tableContext: TableContext<'TRecord>,
key: TableKey,
precondition: option<ConditionExpression<'TRecord>>
) : Transaction =
setClient tableContext.Client

let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Delete = req))
this

/// <summary>
/// Atomically applies a set of 1-100 write operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="items">Operations to be performed.<br/>
/// Throws <c>ArgumentOutOfRangeException</c> if item count is not between 1 and 100 as required by underlying API.<br/>
/// Use <c>TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed</c> to identify any Precondition Check failures.</param>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(?clientRequestToken) : Async<unit> = async {
if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then
raise
<| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems))
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r ->
response.ConsumedCapacity
|> Seq.groupBy (fun x -> x.TableName)
|> Seq.iter (fun (tableName, consumedCapacity) ->
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}
18 changes: 9 additions & 9 deletions tests/FSharp.AWS.DynamoDB.Tests/SimpleTableOperationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -257,30 +257,30 @@ type ``TransactWriteItems tests``(table1: TableFixture, table2: TableFixture) =
let ``All paths`` shouldFail = async {
let item, item2, item3, item4, item5, item6, item7 = mkItem (), mkItem (), mkItem (), mkItem (), mkItem (), mkItem (), mkItem ()
let! key = table1.PutItemAsync item
let Transaction = Transaction()
let transaction = Transaction()

let requests =
[ Transaction.Update(table1, key, compileUpdateTable1 <@ fun t -> { t with Value = 42 } @>, existsConditionTable1)
Transaction.Put(table1, item2)
Transaction.Put(table1, item3, doesntExistConditionTable1)
Transaction.Delete(table1, table1.Template.ExtractKey item4, Some doesntExistConditionTable1)
Transaction.Delete(table1, table1.Template.ExtractKey item5, None)
Transaction.Check(
[ transaction.Update(table1, key, compileUpdateTable1 <@ fun t -> { t with Value = 42 } @>, existsConditionTable1)
transaction.Put(table1, item2)
transaction.Put(table1, item3, doesntExistConditionTable1)
transaction.Delete(table1, table1.Template.ExtractKey item4, Some doesntExistConditionTable1)
transaction.Delete(table1, table1.Template.ExtractKey item5, None)
transaction.Check(
table1,
table1.Template.ExtractKey item6,
(if shouldFail then
existsConditionTable1
else
doesntExistConditionTable1)
)
Transaction.Update(
transaction.Update(
table1,
TableKey.Combined(item7.HashKey, item7.RangeKey),
compileUpdateTable1 <@ fun t -> { t with Tuple = (42, 42) } @>
) ]
let mutable failed = false
try
do! Transaction.TransactWriteItems()
do! transaction.TransactWriteItems()
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed ->
failed <- true
failed =! shouldFail
Expand Down

0 comments on commit 1fdc601

Please sign in to comment.