From 00cdea3d0dbf8d39c3989df89df556e2eccbdf1c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 4 Apr 2022 16:37:54 +0100 Subject: [PATCH] Added Streaming impl --- README.md | 4 +- RELEASE_NOTES.md | 3 +- src/FSharp.AWS.DynamoDB/Script.fsx | 6 +- src/FSharp.AWS.DynamoDB/TableContext.fs | 101 +++++++++++++++++------ tests/FSharp.AWS.DynamoDB.Tests/Utils.fs | 2 +- 5 files changed, 82 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 63e3613..f10bed1 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ open FSharp.AWS.DynamoDB.Scripting // Expose non-Async methods, e.g. PutItem/Get let client : IAmazonDynamoDB = ``your DynamoDB client instance`` let throughput = ProvisionedThroughput(readCapacityUnits = 1L, writeCapacityUnits = 10L) -let table = TableContext.Initialize(client, tableName = "workItems", Provisioned throughput) +let table = TableContext.Initialize(client, tableName = "workItems", Throughput.Provisioned throughput) let workItem = { ProcessId = 0L ; WorkItemId = 1L ; Name = "Test" ; UUID = guid() ; Dependencies = set ["mscorlib"] ; Started = None } @@ -121,7 +121,7 @@ type Counter private (table : TableContext, key : TableKey) = static member Create(client : IAmazonDynamoDB, tableName : string) = async { let table = TableContext(client, tableName) let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) - do! table.InitializeTableAsync(Provisioned throughput) + do! table.InitializeTableAsync(Throughput.Provisioned throughput) let initialEntry = { Id = Guid.NewGuid() ; Value = 0L } let! key = table.PutItemAsync(initialEntry) return Counter(table, key) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4005670..be6ca2f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -9,8 +9,9 @@ * Added `TableContext.Scripting.Initialize` (replaces `TableContext.Create()`) * Added `TableContext.VerifyTableAsync` overload that only performs verification but never creates a Table * Added `TableContext.InitializeTableAsync` (replaces `TableContext.VerifyTableAsync(createIfNotExists = true)`) - * Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateProvisionedThroughputAsync` if throughput has changed) + * Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateTableAsync` if `throughput` or `streaming` has changed) * Added Support for `Throughput.OnDemand` mode (sets `BillingMode` to `PAY_PER_REQUEST` rather than attempting to configure a `ProvisionedThroughput`) + * Added ability to configure DynamoDB streaming (via `Streaming` DU) to `InitializeTableAsync` and `ProvisionTableAsync` * Removed `TableContext.CreateAsync` (replace with `TableContext.VerifyTableAsync` or `InitializeTableAsync`) ### 0.9.3-beta diff --git a/src/FSharp.AWS.DynamoDB/Script.fsx b/src/FSharp.AWS.DynamoDB/Script.fsx index 596a89f..5294302 100644 --- a/src/FSharp.AWS.DynamoDB/Script.fsx +++ b/src/FSharp.AWS.DynamoDB/Script.fsx @@ -55,7 +55,7 @@ type Test = } let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) -let table = TableContext.Initialize(ddb, "test", Provisioned throughput) +let table = TableContext.Initialize(ddb, "test", Throughput.Provisioned throughput) let value = { HashKey = Guid.NewGuid() ; List = [] ; RangeKey = "2" ; Value = 3.1415926 ; Date = DateTimeOffset.Now + TimeSpan.FromDays 2. ; Value2 = None ; Values = [|{ A = "foo" ; B = System.Reflection.BindingFlags.Instance }|] ; Map = Map.ofList [("A1",1)] ; Set = [set [1L];set [2L]] ; Bytes = [|1uy..10uy|]; String = ref "1a" ; Unions = [A 42; B("42",3)]} @@ -128,7 +128,7 @@ type EasyCounters private (table : TableContext) = // Create the table if necessary. Verifies schema is correct if it has already been created // NOTE the hard coded initial throughput provisioning - arguably this belongs outside of your application logic let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) - do! table.InitializeTableAsync(Provisioned throughput) + do! table.InitializeTableAsync(Throughput.Provisioned throughput) return EasyCounters(table) } @@ -143,7 +143,7 @@ type SimpleCounters private (table : TableContext) = // normally, RCU/WCU provisioning only happens first time the Table is created and is then considered an external concern // here we use `ProvisionTableAsync` instead of `InitializeAsync` to reset it each time we start the app let provisionedThroughput = ProvisionedThroughput(readCapacityUnits, writeCapacityUnits) - table.ProvisionTableAsync(Provisioned provisionedThroughput) + table.ProvisionTableAsync(Throughput.Provisioned provisionedThroughput) /// We only want to do the initialization bit once per instance of our application /// Similar to EasyCounters.Create in that it ensures the table is provisioned correctly diff --git a/src/FSharp.AWS.DynamoDB/TableContext.fs b/src/FSharp.AWS.DynamoDB/TableContext.fs index fc24817..8aa60a8 100644 --- a/src/FSharp.AWS.DynamoDB/TableContext.fs +++ b/src/FSharp.AWS.DynamoDB/TableContext.fs @@ -20,17 +20,50 @@ type ResourceNotFoundException = Amazon.DynamoDBv2.Model.ResourceNotFoundExcepti type ProvisionedThroughput = Amazon.DynamoDBv2.Model.ProvisionedThroughput /// Represents the throughput configuration for a Table +[] type Throughput = | Provisioned of ProvisionedThroughput | OnDemand module internal Throughput = let applyToCreateRequest (req : CreateTableRequest) = function - | Provisioned t -> + | Throughput.Provisioned t -> req.ProvisionedThroughput <- t for gsi in req.GlobalSecondaryIndexes do gsi.ProvisionedThroughput <- t - | OnDemand -> + | Throughput.OnDemand -> req.BillingMode <- BillingMode.PAY_PER_REQUEST + let hasChanged (desc : TableDescription) = function + | Throughput.Provisioned t -> + let current = desc.ProvisionedThroughput + desc.BillingModeSummary.BillingMode <> BillingMode.PROVISIONED + || t.ReadCapacityUnits <> current.ReadCapacityUnits + || t.WriteCapacityUnits <> current.WriteCapacityUnits + | Throughput.OnDemand -> + desc.BillingModeSummary.BillingMode <> BillingMode.PAY_PER_REQUEST + let applyToUpdateRequest (req : UpdateTableRequest) = function + | Throughput.Provisioned t -> + req.BillingMode <- BillingMode.PROVISIONED + req.ProvisionedThroughput <- t + | Throughput.OnDemand -> + req.BillingMode <- BillingMode.PAY_PER_REQUEST + +/// Represents the streaming configuration for a Table +type Streaming = + | Enabled of StreamViewType + | Disabled +module internal Streaming = + let private (|Spec|) = function + | Streaming.Enabled svt -> StreamSpecification(StreamEnabled = true, StreamViewType = svt) + | Streaming.Disabled -> StreamSpecification(StreamEnabled = false) + let applyToCreateRequest (req : CreateTableRequest) (Spec spec) = + req.StreamSpecification <- spec + let hasChanged (desc : TableDescription) = function + | Streaming.Disabled -> desc.StreamSpecification.StreamEnabled + | Streaming.Enabled svt -> + not desc.StreamSpecification.StreamEnabled + || desc.StreamSpecification.StreamViewType <> svt + let applyToUpdateRequest (req : UpdateTableRequest) (Spec spec) = + req.StreamSpecification <- spec /// Represents the operation performed on the table, for metrics collection purposes type Operation = GetItem | PutItem | UpdateItem | DeleteItem | BatchGetItems | BatchWriteItems | Scan | Query @@ -921,16 +954,25 @@ type TableContext<'TRecord> internal /// - /// Asynchronously updates the underlying table with supplied provisioned throughput. + /// Asynchronously updates the underlying table with supplied configuration.
+ /// Will throw if customize does not apply any alterations. ///
- /// Provisioned throughput to use on table. - member __.UpdateProvisionedThroughputAsync(provisionedThroughput : ProvisionedThroughput) : Async = async { - let request = UpdateTableRequest(tableName, provisionedThroughput) + /// Callback to apply any options desired. + member _.UpdateTableAsync(customize) : Async = async { + let request = UpdateTableRequest(TableName = tableName) + customize request let! ct = Async.CancellationToken - let! _response = client.UpdateTableAsync(request, ct) |> Async.AwaitTaskCorrect - return () + let! _response = client.UpdateTableAsync(request, ct) |> Async.AwaitTaskCorrect in () } + /// + /// Asynchronously updates the underlying table with supplied provisioned throughput. + /// + /// Provisioned throughput to use on table. + [] + member t.UpdateProvisionedThroughputAsync(provisionedThroughput : ProvisionedThroughput) : Async = + t.UpdateTableAsync(fun req -> Throughput.applyToUpdateRequest req (Throughput.Provisioned provisionedThroughput)) + member internal _.InternalDescribe() : Async = let rec wait () = async { let! ct = Async.CancellationToken @@ -986,13 +1028,14 @@ type TableContext<'TRecord> internal checkOrCreate 9 // up to 9 retries, i.e. 10 attempts before we let exception propagate - member internal _.InternalCreateCreateTableRequest(throughput) = + member internal _.InternalCreateCreateTableRequest(?throughput, ?streaming) = let req = template.Info.Schemata.CreateCreateTableRequest(tableName) - Throughput.applyToCreateRequest req throughput + throughput |> Option.iter (Throughput.applyToCreateRequest req) + streaming |> Option.iter (Streaming.applyToCreateRequest req) req - member internal t.InternalCreateOrValidateTableAsync(throughput) = - t.InternalProvision(fun () -> t.InternalCreateCreateTableRequest throughput) + member internal t.InternalCreateOrValidateTableAsync(?throughput, ?streaming) = + t.InternalProvision(fun () -> t.InternalCreateCreateTableRequest(?throughput = throughput, ?streaming = streaming)) /// /// Asynchronously verify that the table exists and is compatible with record key schema, or throw.
@@ -1007,23 +1050,26 @@ type TableContext<'TRecord> internal /// See also VerifyTableAsync, which only verifies the Table is present and correct. ///
/// Throughput configuration to use for the table. - member t.InitializeTableAsync(throughput : Throughput) : Async = - t.InternalCreateOrValidateTableAsync(throughput) |> Async.Ignore + /// Optional Streaming configuration to use for the table. Default: Disabled. + member t.InitializeTableAsync(throughput : Throughput, ?streaming) : Async = + t.InternalCreateOrValidateTableAsync(throughput, defaultArg streaming Streaming.Disabled) |> Async.Ignore /// /// Asynchronously verifies that the table exists and is compatible with record key schema, throwing if it is incompatible.
- /// If the table is not present, it is provisioned, with the specified throughput.
- /// If it is present, and the throughput is not as specified, uses UpdateProvisionedThroughputAsync to update it.
+ /// If the table is not present, it is provisioned, with the specified throughput and optionally streaming.
+ /// If it is present, and the throughput or streaming are not as specified, uses UpdateTableAsync to adjust.
///
/// Throughput configuration to use for the table. - member t.ProvisionTableAsync(throughput : Throughput) : Async = async { - let! tableDescription = t.InternalCreateOrValidateTableAsync(throughput) - match throughput with - | Provisioned p -> - let current = tableDescription.ProvisionedThroughput - if p.ReadCapacityUnits <> current.ReadCapacityUnits || p.WriteCapacityUnits <> current.WriteCapacityUnits then - do! t.UpdateProvisionedThroughputAsync p - | OnDemand -> () } + /// Optional streaming configuration to apply for the table. Default (if creating): Disabled. Default: (if existing) do not change. + member t.ProvisionTableAsync(throughput : Throughput, ?streaming) : Async = async { + let! tableDescription = t.InternalCreateOrValidateTableAsync(throughput, defaultArg streaming Streaming.Disabled) + let tc = throughput |> Throughput.hasChanged tableDescription + let sc = streaming |> Option.exists (Streaming.hasChanged tableDescription) + if tc || sc then + let apply req = + if tc then Throughput.applyToUpdateRequest req throughput + if sc then streaming |> Option.iter (Streaming.applyToUpdateRequest req) + do! t.UpdateTableAsync(apply) } /// /// Asynchronously verify that the table exists and is compatible with record key schema. @@ -1034,7 +1080,7 @@ type TableContext<'TRecord> internal member t.VerifyTableAsync(?createIfNotExists : bool, ?provisionedThroughput : ProvisionedThroughput) : Async = if createIfNotExists = Some true then let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L) - t.InitializeTableAsync(Provisioned throughput) + t.InitializeTableAsync(Throughput.Provisioned throughput) else t.VerifyTableAsync() @@ -1506,5 +1552,6 @@ module Scripting = /// Updates the underlying table with supplied provisioned throughput. /// /// Provisioned throughput to use on table. - member __.UpdateProvisionedThroughput(provisionedThroughput : ProvisionedThroughput) = - __.UpdateProvisionedThroughputAsync(provisionedThroughput) |> Async.RunSynchronously + member t.UpdateProvisionedThroughput(provisionedThroughput : ProvisionedThroughput) = + let spec = Throughput.Provisioned provisionedThroughput + t.UpdateTableAsync(fun req -> Throughput.applyToUpdateRequest req spec) |> Async.RunSynchronously diff --git a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs index 5032814..436f08a 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs @@ -42,6 +42,6 @@ module Utils = member _.CreateContextAndTableIfNotExists<'TRecord>() = let throughput = Model.ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) - Scripting.TableContext.Initialize<'TRecord>(client, tableName, Provisioned throughput) + Scripting.TableContext.Initialize<'TRecord>(client, tableName, Throughput.Provisioned throughput) interface IDisposable with member _.Dispose() = client.DeleteTableAsync(tableName) |> Async.AwaitTask |> Async.RunSynchronously |> ignore