Skip to content

Commit

Permalink
Create a new TransactionBuilder class instead of a module
Browse files Browse the repository at this point in the history
  • Loading branch information
purkhusid committed Sep 5, 2024
1 parent 6f82f7a commit 76b7ee9
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 110 deletions.
181 changes: 112 additions & 69 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,6 @@ type private LimitType =
static member AllOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue All
static member DefaultOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue Default

/// Helpers for working with <c>TransactWriteItemsRequest</c>
module TransactWriteItemsRequest =
/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
function
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
| _ -> None

/// Helpers for identifying Failed Precondition check outcomes emanating from <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c>
module Precondition =
/// <summary>Exception filter to identify whether an individual (non-transactional) <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c> call's <c>precondition</c> check failing.</summary>
Expand All @@ -247,39 +238,6 @@ module Precondition =
| :? ConditionalCheckFailedException -> Some()
| _ -> None

type TransactWriter<'TRecord> internal (tableName: string, template: RecordTemplate<'TRecord>) =
member _.Put(item: 'TRecord, precondition: option<ConditionExpression<'TRecord>>) : TransactWriteItem =
let req = Put(TableName = tableName, Item = template.ToAttributeValues item)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Put = req)
member _.Check(key: TableKey, condition: ConditionExpression<'TRecord>) : TransactWriteItem =
let req = ConditionCheck(TableName = tableName, Key = template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- condition.Conditional.Write writer
TransactWriteItem(ConditionCheck = req)
member _.Update
(
key: TableKey,
precondition: option<ConditionExpression<'TRecord>>,
updater: UpdateExpression<'TRecord>

) : TransactWriteItem =
let req = Update(TableName = tableName, Key = template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Update = req)
member _.Delete(key: TableKey, precondition: option<ConditionExpression<'TRecord>>) : TransactWriteItem =
let req = Delete(TableName = tableName, Key = template.ToAttributeValues key)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
TransactWriteItem(Delete = req)


/// DynamoDB client object for performing table operations in the context of given F# record representations
[<Sealed; AutoSerializable(false)>]
Expand Down Expand Up @@ -532,9 +490,6 @@ type TableContext<'TRecord>
member _.LocalSecondaryIndices = template.LocalSecondaryIndices
/// Record-induced table template
member _.Template = template
/// Represents an individual request that can be included in the <c>TransactItems</c> of a <c>TransactWriteItems</c> call.</summary>
member _.TransactWrite = TransactWriter<'TRecord>(tableName, template)


/// <summary>
/// Creates a DynamoDB client instance for given F# record and table name.<br/>
Expand Down Expand Up @@ -886,30 +841,6 @@ type TableContext<'TRecord>
return unprocessed |> Array.map template.ExtractKey
}


/// <summary>
/// Atomically applies a set of 1-100 write operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="items">Operations to be performed.<br/>
/// Throws <c>ArgumentOutOfRangeException</c> if item count is not between 1 and 100 as required by underlying API.<br/>
/// Use <c>TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed</c> to identify any Precondition Check failures.</param>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(items: seq<TransactWriteItem>, ?clientRequestToken) : Async<unit> = async {
if (Seq.length items) = 0 || (Seq.length items) > 100 then
raise <| System.ArgumentOutOfRangeException(nameof items, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray items))
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r -> r TransactWriteItems (Seq.toList response.ConsumedCapacity) (Seq.length items))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}


/// <summary>
/// Asynchronously queries table with given condition expressions.
/// </summary>
Expand Down Expand Up @@ -2081,3 +2012,115 @@ module Scripting =
member t.UpdateProvisionedThroughput(provisionedThroughput: ProvisionedThroughput) : unit =
let spec = Throughput.Provisioned provisionedThroughput
t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously

/// Helpers for working with <c>TransactWriteItemsRequest</c>
module TransactWriteItemsRequest =
/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
function
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
| _ -> None


type TransactionBuilder(?metricsCollector: (RequestMetrics -> unit)) =
let transactionItems = ResizeArray<TransactWriteItem>()
let mutable (dynamoDbClient: IAmazonDynamoDB) = null

let setClient client =
if dynamoDbClient = null then
System.Console.WriteLine("WAT")
dynamoDbClient <- client

let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) =
collector
{ TableName = tableName
Operation = operation
ConsumedCapacity = consumedCapacity
ItemCount = itemCount }

let returnConsumedCapacity, maybeReport =
match metricsCollector with
| Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink)
| None -> ReturnConsumedCapacity.NONE, None

member this.Put<'TRecord>
(
tableContext: TableContext<'TRecord>,
item: 'TRecord,
?precondition: ConditionExpression<'TRecord>
) : TransactionBuilder =
setClient tableContext.Client
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Put = req))
this
member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : TransactionBuilder =
setClient tableContext.Client

let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- condition.Conditional.Write writer
transactionItems.Add(TransactWriteItem(ConditionCheck = req))
this
member this.Update
(
tableContext: TableContext<'TRecord>,
key: TableKey,
updater: UpdateExpression<'TRecord>,
?precondition: ConditionExpression<'TRecord>

) : TransactionBuilder =
setClient tableContext.Client

let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.UpdateExpression <- updater.UpdateOps.Write(writer)
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Update = req))
this
member this.Delete
(
tableContext: TableContext<'TRecord>,
key: TableKey,
precondition: option<ConditionExpression<'TRecord>>
) : TransactionBuilder =
setClient tableContext.Client

let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
precondition
|> Option.iter (fun cond ->
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
req.ConditionExpression <- cond.Conditional.Write writer)
transactionItems.Add(TransactWriteItem(Delete = req))
this

/// <summary>
/// Atomically applies a set of 1-100 write operations to the table.<br/>
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
/// </summary>
/// <param name="items">Operations to be performed.<br/>
/// Throws <c>ArgumentOutOfRangeException</c> if item count is not between 1 and 100 as required by underlying API.<br/>
/// Use <c>TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed</c> to identify any Precondition Check failures.</param>
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
member _.TransactWriteItems(?clientRequestToken) : Async<unit> = async {
if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then
raise
<| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.")
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems))
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
let! ct = Async.CancellationToken
let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
maybeReport
|> Option.iter (fun r ->
response.ConsumedCapacity
|> Seq.groupBy (fun x -> x.TableName)
|> Seq.iter (fun (tableName, consumedCapacity) ->
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
}
17 changes: 9 additions & 8 deletions tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ type Tests(fixture: TableFixture) =
collector.Clear()

let item = mkItem (guid ()) (guid ()) 0
let requests = [ sut.TransactWrite.Put(item, Some(compile <@ fun t -> NOT_EXISTS t.RangeKey @>)) ]

do! sut.TransactWriteItems requests
do!
TransactionBuilder(collector.Collect)
.Put(sut, item, compile <@ fun t -> NOT_EXISTS t.RangeKey @>)
.TransactWriteItems()

test
<@
Expand All @@ -130,13 +131,13 @@ type Tests(fixture: TableFixture) =
let sut = rawTable.WithMetricsCollector(collector.Collect)

let item = mkItem (guid ()) (guid ()) 0

// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
let requests = [ sut.TransactWrite.Put(item, Some(compile <@ fun t -> EXISTS t.RangeKey @>)) ]

let mutable failed = false
try
do! sut.TransactWriteItems requests
do!
// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
TransactionBuilder()
.Put(sut, item, compile <@ fun t -> EXISTS t.RangeKey @>)
.TransactWriteItems()
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed ->
failed <- true
true =! failed
Expand Down
Loading

0 comments on commit 76b7ee9

Please sign in to comment.