Skip to content

Commit

Permalink
PR feedback from @bartelink
Browse files Browse the repository at this point in the history
  • Loading branch information
samritchie committed Apr 4, 2022
1 parent 0a09963 commit 36d7e45
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 69 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ ModelManifest.xml

.fake
.paket/paket.exe
temp/*
paket-files/*
temp
paket-files
xunit.html
/db
.DS_Store
Expand Down
85 changes: 39 additions & 46 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,13 @@ type TableContext<'TRecord> internal

let tryGetItemAsync (key : TableKey) (proj : ProjectionExpr.ProjectionExpr option) = async {
let kav = template.ToAttributeValues(key)
let request = GetItemRequest(tableName, kav)
let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity)
match proj with
| None -> ()
| Some proj ->
let aw = AttributeWriter(request.ExpressionAttributeNames, null)
request.ProjectionExpression <- proj.Write aw

request.ReturnConsumedCapacity <- returnConsumedCapacity

let! ct = Async.CancellationToken
let! response = client.GetItemAsync(request, ct) |> Async.AwaitTaskCorrect
maybeReport |> Option.iter (fun r -> r GetItem [ response.ConsumedCapacity ] (if response.IsItemSet then 1 else 0))
Expand Down Expand Up @@ -103,9 +101,8 @@ type TableContext<'TRecord> internal
let aw = AttributeWriter(kna.ExpressionAttributeNames, null)
kna.ProjectionExpression <- projExpr.Write aw

let request = BatchGetItemRequest()
let request = BatchGetItemRequest(ReturnConsumedCapacity = returnConsumedCapacity)
request.RequestItems.[tableName] <- kna
request.ReturnConsumedCapacity <- returnConsumedCapacity

let! ct = Async.CancellationToken
let! response = client.BatchGetItemAsync(request, ct) |> Async.AwaitTaskCorrect
Expand Down Expand Up @@ -141,7 +138,7 @@ type TableContext<'TRecord> internal
let mutable lastEvaluatedKey : Dictionary<string,AttributeValue> option = None

let rec aux last = async {
let request = QueryRequest(tableName)
let request = QueryRequest(tableName, ReturnConsumedCapacity = returnConsumedCapacity)
keyCondition.IndexName |> Option.iter (fun name -> request.IndexName <- name)
let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
request.KeyConditionExpression <- keyCondition.Write writer
Expand All @@ -154,8 +151,6 @@ type TableContext<'TRecord> internal
| None -> ()
| Some pe -> request.ProjectionExpression <- pe.Write writer

request.ReturnConsumedCapacity <- returnConsumedCapacity

limit.GetCount() |> Option.iter (fun l -> request.Limit <- l - downloaded.Count)
consistentRead |> Option.iter (fun cr -> request.ConsistentRead <- cr)
scanIndexForward |> Option.iter (fun sif -> request.ScanIndexForward <- sif)
Expand Down Expand Up @@ -206,7 +201,7 @@ type TableContext<'TRecord> internal
maybeReport |> Option.iter (fun r -> r Scan (Seq.toList consumedCapacity) downloaded.Count)
let mutable lastEvaluatedKey : Dictionary<string,AttributeValue> option = None
let rec aux last = async {
let request = ScanRequest(tableName)
let request = ScanRequest(tableName, ReturnConsumedCapacity = returnConsumedCapacity)
let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
match filterCondition with
| None -> ()
Expand All @@ -216,8 +211,6 @@ type TableContext<'TRecord> internal
| None -> ()
| Some pe -> request.ProjectionExpression <- pe.Write writer

request.ReturnConsumedCapacity <- returnConsumedCapacity

limit.GetCount() |> Option.iter (fun l -> request.Limit <- l - downloaded.Count)
consistentRead |> Option.iter (fun cr -> request.ConsistentRead <- cr)
last |> Option.iter (fun l -> request.ExclusiveStartKey <- l)
Expand Down Expand Up @@ -277,7 +270,7 @@ type TableContext<'TRecord> internal

new TableContext<'TRecord2>(client, tableName, rd, metricsCollector)

/// Creates a new table context instance that uses a unique metricsCollector
/// Creates an identical table context with the specified metricsCollector callback replacing any previously specified one
member __.WithMetricsCollector(collector : (RequestMetrics -> unit)) : TableContext<'TRecord> =
new TableContext<'TRecord>(client, tableName, template, Some collector)

Expand All @@ -288,9 +281,7 @@ type TableContext<'TRecord> internal
/// <param name="precondition">Precondition to satisfy in case item already exists.</param>
member __.PutItemAsync(item : 'TRecord, ?precondition : ConditionExpression<'TRecord>) : Async<TableKey> = async {
let attrValues = template.ToAttributeValues(item)
let request = PutItemRequest(tableName, attrValues)
request.ReturnValues <- ReturnValue.NONE
request.ReturnConsumedCapacity <- returnConsumedCapacity
let request = PutItemRequest(tableName, attrValues, ReturnValues = ReturnValue.NONE, ReturnConsumedCapacity = returnConsumedCapacity)
match precondition with
| Some pc ->
let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
Expand All @@ -311,9 +302,8 @@ type TableContext<'TRecord> internal
/// </summary>
/// <param name="item">Item to be written.</param>
/// <param name="precondition">Precondition to satisfy in case item already exists.</param>
member __.PutItemAsync(item : 'TRecord, precondition : Expr<'TRecord -> bool>) = async {
return! __.PutItemAsync(item, template.PrecomputeConditionalExpr precondition)
}
member __.PutItemAsync(item : 'TRecord, precondition : Expr<'TRecord -> bool>) =
__.PutItemAsync(item, template.PrecomputeConditionalExpr precondition)

/// <summary>
/// Asynchronously puts a collection of items to the table as a batch write operation.
Expand All @@ -330,16 +320,22 @@ type TableContext<'TRecord> internal
let items = Seq.toArray items
if items.Length > 25 then invalidArg "items" "item length must be less than or equal to 25."
let writeRequests = items |> Seq.map mkWriteRequest |> rlist
let pbr = BatchWriteItemRequest()
let pbr = BatchWriteItemRequest(ReturnConsumedCapacity = returnConsumedCapacity)
pbr.RequestItems.[tableName] <- writeRequests
pbr.ReturnConsumedCapacity <- returnConsumedCapacity
let! ct = Async.CancellationToken
let! response = client.BatchWriteItemAsync(pbr, ct) |> Async.AwaitTaskCorrect
maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (items.Length - response.UnprocessedItems.Count))
let unprocessed =
match response.UnprocessedItems.TryGetValue tableName with
| (true, reqs) ->
reqs |> Seq.choose (fun r -> r.PutRequest |> Option.ofObj)
|> Seq.map (fun w -> w.Item)
|> Seq.toArray
| (false, _) -> [||]
maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (items.Length - unprocessed.Length))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "BatchWriteItem put request returned error %O" response.HttpStatusCode

return unprocessedPutAttributeValues tableName response |> Array.map template.OfAttributeValues
return unprocessed |> Array.map template.OfAttributeValues
}


Expand All @@ -355,13 +351,11 @@ type TableContext<'TRecord> internal
?returnLatest : bool) : Async<'TRecord> = async {

let kav = template.ToAttributeValues(key)
let request = UpdateItemRequest(Key = kav, TableName = tableName)
let request = UpdateItemRequest(Key = kav, TableName = tableName, ReturnConsumedCapacity = returnConsumedCapacity)
request.ReturnValues <-
if defaultArg returnLatest true then ReturnValue.ALL_NEW
else ReturnValue.ALL_OLD

request.ReturnConsumedCapacity <- returnConsumedCapacity

let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
request.UpdateExpression <- updater.UpdateOps.Write(writer)

Expand Down Expand Up @@ -425,13 +419,12 @@ type TableContext<'TRecord> internal
/// <param name="key">Key to be checked.</param>
member __.ContainsKeyAsync(key : TableKey) : Async<bool> = async {
let kav = template.ToAttributeValues(key)
let request = GetItemRequest(tableName, kav)
let request = GetItemRequest(tableName, kav, ReturnConsumedCapacity = returnConsumedCapacity)
request.ExpressionAttributeNames.Add("#HKEY", template.PrimaryKey.HashKey.AttributeName)
request.ProjectionExpression <- "#HKEY"
request.ReturnConsumedCapacity <- returnConsumedCapacity
let! ct = Async.CancellationToken
let! response = client.GetItemAsync(request, ct) |> Async.AwaitTaskCorrect
maybeReport |> Option.iter (fun r -> r GetItem [ response.ConsumedCapacity ] 0)
maybeReport |> Option.iter (fun r -> r GetItem [ response.ConsumedCapacity ] 1)
return response.IsItemSet
}

Expand Down Expand Up @@ -465,10 +458,8 @@ type TableContext<'TRecord> internal
/// </summary>
/// <param name="key">Key of item to be fetched.</param>
/// <param name="projection">Projection expression to be applied to item.</param>
member __.GetItemProjectedAsync(key : TableKey, projection : Expr<'TRecord -> 'TProjection>) : Async<'TProjection> = async {
return! __.GetItemProjectedAsync(key, template.PrecomputeProjectionExpr projection)
}

member __.GetItemProjectedAsync(key : TableKey, projection : Expr<'TRecord -> 'TProjection>) : Async<'TProjection> =
__.GetItemProjectedAsync(key, template.PrecomputeProjectionExpr projection)

/// <summary>
/// Asynchronously performs a batch fetch of items with supplied keys.
Expand Down Expand Up @@ -501,9 +492,8 @@ type TableContext<'TRecord> internal
/// <param name="projection">Projection expression to be applied to item.</param>
/// <param name="consistentRead">Perform consistent read. Defaults to false.</param>
member __.BatchGetItemsProjectedAsync<'TProjection>(keys : seq<TableKey>, projection : Expr<'TRecord -> 'TProjection>,
?consistentRead : bool) : Async<'TProjection[]> = async {
return! __.BatchGetItemsProjectedAsync(keys, template.PrecomputeProjectionExpr projection, ?consistentRead = consistentRead)
}
?consistentRead : bool) : Async<'TProjection[]> =
__.BatchGetItemsProjectedAsync(keys, template.PrecomputeProjectionExpr projection, ?consistentRead = consistentRead)


/// <summary>
Expand All @@ -514,18 +504,16 @@ type TableContext<'TRecord> internal
/// <param name="precondition">Specifies a precondition expression that existing item should satisfy.</param>
member __.DeleteItemAsync(key : TableKey, ?precondition : ConditionExpression<'TRecord>) : Async<'TRecord option> = async {
let kav = template.ToAttributeValues key
let request = DeleteItemRequest(tableName, kav)
let request = DeleteItemRequest(tableName, kav, ReturnValues = ReturnValue.ALL_OLD, ReturnConsumedCapacity = returnConsumedCapacity)
match precondition with
| Some pc ->
let writer = AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
request.ConditionExpression <- pc.Conditional.Write writer
| None -> ()

request.ReturnValues <- ReturnValue.ALL_OLD
request.ReturnConsumedCapacity <- returnConsumedCapacity
let! ct = Async.CancellationToken
let! response = client.DeleteItemAsync(request, ct) |> Async.AwaitTaskCorrect
maybeReport |> Option.iter (fun r -> r DeleteItem [ response.ConsumedCapacity ] 0)
maybeReport |> Option.iter (fun r -> r DeleteItem [ response.ConsumedCapacity ] 1)
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "DeleteItem request returned error %O" response.HttpStatusCode

Expand All @@ -541,9 +529,8 @@ type TableContext<'TRecord> internal
/// <returns>The deleted item, or None if the item didn’t exist.</returns>
/// <param name="key">Key of item to be deleted.</param>
/// <param name="precondition">Specifies a precondition expression that existing item should satisfy.</param>
member __.DeleteItemAsync(key : TableKey, precondition : Expr<'TRecord -> bool>) : Async<'TRecord option> = async {
return! __.DeleteItemAsync(key, template.PrecomputeConditionalExpr precondition)
}
member __.DeleteItemAsync(key : TableKey, precondition : Expr<'TRecord -> bool>) : Async<'TRecord option> =
__.DeleteItemAsync(key, template.PrecomputeConditionalExpr precondition)


/// <summary>
Expand All @@ -559,18 +546,24 @@ type TableContext<'TRecord> internal

let keys = Seq.toArray keys
if keys.Length > 25 then invalidArg "items" "key length must be less than or equal to 25."
let request = BatchWriteItemRequest()
let request = BatchWriteItemRequest(ReturnConsumedCapacity = returnConsumedCapacity)
let deleteRequests = keys |> Seq.map mkDeleteRequest |> rlist
request.RequestItems.[tableName] <- deleteRequests
request.ReturnConsumedCapacity <- returnConsumedCapacity

let! ct = Async.CancellationToken
let! response = client.BatchWriteItemAsync(request, ct) |> Async.AwaitTaskCorrect
maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (keys.Length - response.UnprocessedItems.Count))
let unprocessed =
match response.UnprocessedItems.TryGetValue tableName with
| (true, reqs) ->
reqs |> Seq.choose (fun r -> r.DeleteRequest |> Option.ofObj)
|> Seq.map (fun d -> d.Key)
|> Seq.toArray
| (false, _) -> [||]
maybeReport |> Option.iter (fun r -> r BatchWriteItems (Seq.toList response.ConsumedCapacity) (keys.Length - unprocessed.Length))
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "BatchWriteItem deletion request returned error %O" response.HttpStatusCode

return unprocessedDeleteAttributeValues tableName response |> Array.map template.ExtractKey
return unprocessed |> Array.map template.ExtractKey
}


Expand Down
18 changes: 0 additions & 18 deletions src/FSharp.AWS.DynamoDB/Utils/DynamoUtils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,3 @@ let isValidFieldName (name : string) =

let isValidKeyName (name : string) =
name <> null && name.Length > 0 && utf8Length name <= 255

let unprocessedDeleteAttributeValues tableName (response : BatchWriteItemResponse) =
let (ok, reqs) = response.UnprocessedItems.TryGetValue tableName
if ok then
reqs |> Seq.choose (fun r -> r.DeleteRequest |> Option.ofObj)
|> Seq.map (fun d -> d.Key)
|> Seq.toArray
else
[||]

let unprocessedPutAttributeValues tableName (response : BatchWriteItemResponse) =
let (ok, reqs) = response.UnprocessedItems.TryGetValue tableName
if ok then
reqs |> Seq.choose (fun r -> r.PutRequest |> Option.ofObj)
|> Seq.map (fun w -> w.Item)
|> Seq.toArray
else
[||]
32 changes: 29 additions & 3 deletions tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ module MetricsCollectorTests =
}

type TestCollector() =
let mutable (metrics : RequestMetrics list) = []
let metrics = ResizeArray<RequestMetrics>()
member _.Collect (m : RequestMetrics) =
metrics <- List.append metrics [ m ]
metrics.Add m

member _.Metrics with get() = metrics
member _.Metrics with get() = metrics |> Seq.toList

type ``Metrics Collector Tests`` (fixture : TableFixture) =

Expand Down Expand Up @@ -63,6 +63,19 @@ type ``Metrics Collector Tests`` (fixture : TableFixture) =
Expect.equal metrics.ItemCount 1 ""
Expect.isGreaterThan (metrics.ConsumedCapacity |> Seq.sumBy (fun c -> c.CapacityUnits)) 0. ""

member __.``Collect Metrics on ContainsKey`` () =
let item = mkItem (guid()) (guid()) 0
table.PutItem item |> ignore

let collector = TestCollector()
let _ = table.WithMetricsCollector(collector.Collect).ContainsKey (key = TableKey.Combined (item.HashKey, 0))
Expect.equal collector.Metrics.Length 1 ""
let metrics = collector.Metrics.Head
Expect.equal metrics.Operation GetItem ""
Expect.equal metrics.TableName fixture.TableName ""
Expect.equal metrics.ItemCount 1 ""
Expect.isGreaterThan (metrics.ConsumedCapacity |> Seq.sumBy (fun c -> c.CapacityUnits)) 0. ""

member __.``Collect Metrics on PutItem`` () =
let item = mkItem (guid()) (guid()) 0
let collector = TestCollector()
Expand All @@ -88,6 +101,19 @@ type ``Metrics Collector Tests`` (fixture : TableFixture) =
Expect.equal metrics.ItemCount 1 ""
Expect.isGreaterThan (metrics.ConsumedCapacity |> Seq.sumBy (fun c -> c.CapacityUnits)) 0. ""

member __.``Collect Metrics on DeleteItem`` () =
let item = mkItem (guid()) (guid()) 0
let collector = TestCollector()
table.PutItem item |> ignore
table.WithMetricsCollector(collector.Collect).DeleteItem(TableKey.Combined (item.HashKey, item.RangeKey)) |> ignore

Expect.equal collector.Metrics.Length 1 ""
let metrics = collector.Metrics.Head
Expect.equal metrics.Operation DeleteItem ""
Expect.equal metrics.TableName fixture.TableName ""
Expect.equal metrics.ItemCount 1 ""
Expect.isGreaterThan (metrics.ConsumedCapacity |> Seq.sumBy (fun c -> c.CapacityUnits)) 0. ""

member __.``Collect Metrics on Scan`` () =
let hk = guid()
let gsk = guid()
Expand Down
3 changes: 3 additions & 0 deletions tests/FSharp.AWS.DynamoDB.Tests/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ let SimpleTableOperationTests =
testCase "Convert to compatible table 2" simpleTableOperationTests.``Convert to compatible table 2``
testCase "Simple Put Operation" simpleTableOperationTests.``Simple Put Operation``
testCase "ContainsKey Operation" simpleTableOperationTests.``ContainsKey Operation``
testCase "TryGet Operation" simpleTableOperationTests.``TryGet Operation``
testCase "Batch Put Operation" simpleTableOperationTests.``Batch Put Operation``
testCase "Batch Delete Operation" simpleTableOperationTests.``Batch Delete Operation``
testCase "Simple Delete Operation" simpleTableOperationTests.``Simple Delete Operation``
Expand Down Expand Up @@ -218,8 +219,10 @@ let MetricsCollectorTests =
testList "MetricsCollectorTests"
[
testCase "Collect Metrics on GetItem" metricsCollectorTests.``Collect Metrics on GetItem``
testCase "Collect Metrics on ContainsKey" metricsCollectorTests.``Collect Metrics on ContainsKey``
testCase "Collect Metrics on PutItem" metricsCollectorTests.``Collect Metrics on PutItem``
testCase "Collect Metrics on UpdateItem" metricsCollectorTests.``Collect Metrics on UpdateItem``
testCase "Collect Metrics on DeleteItem" metricsCollectorTests.``Collect Metrics on DeleteItem``
testCase "Collect Metrics on Scan" metricsCollectorTests.``Collect Metrics on Scan``
testCase "Collect Metrics on Query" metricsCollectorTests.``Collect Metrics on Query``
testCase "Collect Metrics on BatchGetItem" metricsCollectorTests.``Collect Metrics on BatchGetItem``
Expand Down

0 comments on commit 36d7e45

Please sign in to comment.