Skip to content

Commit

Permalink
Added Streaming impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Apr 4, 2022
1 parent f9f6ddb commit 00cdea3
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 34 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ open FSharp.AWS.DynamoDB.Scripting // Expose non-Async methods, e.g. PutItem/Get
let client : IAmazonDynamoDB = ``your DynamoDB client instance``
let throughput = ProvisionedThroughput(readCapacityUnits = 1L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", Provisioned throughput)
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", Throughput.Provisioned throughput)
let workItem = { ProcessId = 0L ; WorkItemId = 1L ; Name = "Test" ; UUID = guid() ; Dependencies = set ["mscorlib"] ; Started = None }
Expand Down Expand Up @@ -121,7 +121,7 @@ type Counter private (table : TableContext<CounterEntry>, key : TableKey) =
static member Create(client : IAmazonDynamoDB, tableName : string) = async {
let table = TableContext<CounterEntry>(client, tableName)
let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
do! table.InitializeTableAsync(Provisioned throughput)
do! table.InitializeTableAsync(Throughput.Provisioned throughput)
let initialEntry = { Id = Guid.NewGuid() ; Value = 0L }
let! key = table.PutItemAsync(initialEntry)
return Counter(table, key)
Expand Down
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
* Added `TableContext.Scripting.Initialize` (replaces `TableContext.Create()`)
* Added `TableContext.VerifyTableAsync` overload that only performs verification but never creates a Table
* Added `TableContext.InitializeTableAsync` (replaces `TableContext.VerifyTableAsync(createIfNotExists = true)`)
* Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateProvisionedThroughputAsync` if throughput has changed)
* Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateTableAsync` if `throughput` or `streaming` has changed)
* Added Support for `Throughput.OnDemand` mode (sets `BillingMode` to `PAY_PER_REQUEST` rather than attempting to configure a `ProvisionedThroughput`)
* Added ability to configure DynamoDB streaming (via `Streaming` DU) to `InitializeTableAsync` and `ProvisionTableAsync`
* Removed `TableContext.CreateAsync` (replace with `TableContext.VerifyTableAsync` or `InitializeTableAsync`)

### 0.9.3-beta
Expand Down
6 changes: 3 additions & 3 deletions src/FSharp.AWS.DynamoDB/Script.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Test =
}

let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<Test>(ddb, "test", Provisioned throughput)
let table = TableContext.Initialize<Test>(ddb, "test", Throughput.Provisioned throughput)

let value = { HashKey = Guid.NewGuid() ; List = [] ; RangeKey = "2" ; Value = 3.1415926 ; Date = DateTimeOffset.Now + TimeSpan.FromDays 2. ; Value2 = None ; Values = [|{ A = "foo" ; B = System.Reflection.BindingFlags.Instance }|] ; Map = Map.ofList [("A1",1)] ; Set = [set [1L];set [2L]] ; Bytes = [|1uy..10uy|]; String = ref "1a" ; Unions = [A 42; B("42",3)]}

Expand Down Expand Up @@ -128,7 +128,7 @@ type EasyCounters private (table : TableContext<CounterEntry>) =
// Create the table if necessary. Verifies schema is correct if it has already been created
// NOTE the hard coded initial throughput provisioning - arguably this belongs outside of your application logic
let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
do! table.InitializeTableAsync(Provisioned throughput)
do! table.InitializeTableAsync(Throughput.Provisioned throughput)
return EasyCounters(table)
}

Expand All @@ -143,7 +143,7 @@ type SimpleCounters private (table : TableContext<CounterEntry>) =
// normally, RCU/WCU provisioning only happens first time the Table is created and is then considered an external concern
// here we use `ProvisionTableAsync` instead of `InitializeAsync` to reset it each time we start the app
let provisionedThroughput = ProvisionedThroughput(readCapacityUnits, writeCapacityUnits)
table.ProvisionTableAsync(Provisioned provisionedThroughput)
table.ProvisionTableAsync(Throughput.Provisioned provisionedThroughput)

/// We only want to do the initialization bit once per instance of our application
/// Similar to EasyCounters.Create in that it ensures the table is provisioned correctly
Expand Down
101 changes: 74 additions & 27 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,50 @@ type ResourceNotFoundException = Amazon.DynamoDBv2.Model.ResourceNotFoundExcepti
type ProvisionedThroughput = Amazon.DynamoDBv2.Model.ProvisionedThroughput

/// Represents the throughput configuration for a Table
[<RequireQualifiedAccess>]
type Throughput =
| Provisioned of ProvisionedThroughput
| OnDemand
module internal Throughput =
let applyToCreateRequest (req : CreateTableRequest) = function
| Provisioned t ->
| Throughput.Provisioned t ->
req.ProvisionedThroughput <- t
for gsi in req.GlobalSecondaryIndexes do
gsi.ProvisionedThroughput <- t
| OnDemand ->
| Throughput.OnDemand ->
req.BillingMode <- BillingMode.PAY_PER_REQUEST
let hasChanged (desc : TableDescription) = function
| Throughput.Provisioned t ->
let current = desc.ProvisionedThroughput
desc.BillingModeSummary.BillingMode <> BillingMode.PROVISIONED
|| t.ReadCapacityUnits <> current.ReadCapacityUnits
|| t.WriteCapacityUnits <> current.WriteCapacityUnits
| Throughput.OnDemand ->
desc.BillingModeSummary.BillingMode <> BillingMode.PAY_PER_REQUEST
let applyToUpdateRequest (req : UpdateTableRequest) = function
| Throughput.Provisioned t ->
req.BillingMode <- BillingMode.PROVISIONED
req.ProvisionedThroughput <- t
| Throughput.OnDemand ->
req.BillingMode <- BillingMode.PAY_PER_REQUEST

/// Represents the streaming configuration for a Table
type Streaming =
| Enabled of StreamViewType
| Disabled
module internal Streaming =
let private (|Spec|) = function
| Streaming.Enabled svt -> StreamSpecification(StreamEnabled = true, StreamViewType = svt)
| Streaming.Disabled -> StreamSpecification(StreamEnabled = false)
let applyToCreateRequest (req : CreateTableRequest) (Spec spec) =
req.StreamSpecification <- spec
let hasChanged (desc : TableDescription) = function
| Streaming.Disabled -> desc.StreamSpecification.StreamEnabled
| Streaming.Enabled svt ->
not desc.StreamSpecification.StreamEnabled
|| desc.StreamSpecification.StreamViewType <> svt
let applyToUpdateRequest (req : UpdateTableRequest) (Spec spec) =
req.StreamSpecification <- spec

/// Represents the operation performed on the table, for metrics collection purposes
type Operation = GetItem | PutItem | UpdateItem | DeleteItem | BatchGetItems | BatchWriteItems | Scan | Query
Expand Down Expand Up @@ -921,16 +954,25 @@ type TableContext<'TRecord> internal


/// <summary>
/// Asynchronously updates the underlying table with supplied provisioned throughput.
/// Asynchronously updates the underlying table with supplied configuration.<br />
/// Will throw if <c>customize</c> does not apply any alterations.
/// </summary>
/// <param name="provisionedThroughput">Provisioned throughput to use on table.</param>
member __.UpdateProvisionedThroughputAsync(provisionedThroughput : ProvisionedThroughput) : Async<unit> = async {
let request = UpdateTableRequest(tableName, provisionedThroughput)
/// <param name="customize">Callback to apply any options desired.</param>
member _.UpdateTableAsync(customize) : Async<unit> = async {
let request = UpdateTableRequest(TableName = tableName)
customize request
let! ct = Async.CancellationToken
let! _response = client.UpdateTableAsync(request, ct) |> Async.AwaitTaskCorrect
return ()
let! _response = client.UpdateTableAsync(request, ct) |> Async.AwaitTaskCorrect in ()
}

/// <summary>
/// Asynchronously updates the underlying table with supplied provisioned throughput.
/// </summary>
/// <param name="provisionedThroughput">Provisioned throughput to use on table.</param>
[<System.Obsolete("Please replace with either 1. UpdateTableAsync or 2. ProvisionTableAsync")>]
member t.UpdateProvisionedThroughputAsync(provisionedThroughput : ProvisionedThroughput) : Async<unit> =
t.UpdateTableAsync(fun req -> Throughput.applyToUpdateRequest req (Throughput.Provisioned provisionedThroughput))

member internal _.InternalDescribe() : Async<TableDescription> =
let rec wait () = async {
let! ct = Async.CancellationToken
Expand Down Expand Up @@ -986,13 +1028,14 @@ type TableContext<'TRecord> internal

checkOrCreate 9 // up to 9 retries, i.e. 10 attempts before we let exception propagate

member internal _.InternalCreateCreateTableRequest(throughput) =
member internal _.InternalCreateCreateTableRequest(?throughput, ?streaming) =
let req = template.Info.Schemata.CreateCreateTableRequest(tableName)
Throughput.applyToCreateRequest req throughput
throughput |> Option.iter (Throughput.applyToCreateRequest req)
streaming |> Option.iter (Streaming.applyToCreateRequest req)
req

member internal t.InternalCreateOrValidateTableAsync(throughput) =
t.InternalProvision(fun () -> t.InternalCreateCreateTableRequest throughput)
member internal t.InternalCreateOrValidateTableAsync(?throughput, ?streaming) =
t.InternalProvision(fun () -> t.InternalCreateCreateTableRequest(?throughput = throughput, ?streaming = streaming))

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema, or throw.<br/>
Expand All @@ -1007,23 +1050,26 @@ type TableContext<'TRecord> internal
/// See also <c>VerifyTableAsync</c>, which only verifies the Table is present and correct.
/// </summary>
/// <param name="throughput">Throughput configuration to use for the table.</param>
member t.InitializeTableAsync(throughput : Throughput) : Async<unit> =
t.InternalCreateOrValidateTableAsync(throughput) |> Async.Ignore
/// <param name="streaming">Optional Streaming configuration to use for the table. Default: Disabled.</param>
member t.InitializeTableAsync(throughput : Throughput, ?streaming) : Async<unit> =
t.InternalCreateOrValidateTableAsync(throughput, defaultArg streaming Streaming.Disabled) |> Async.Ignore

/// <summary>
/// Asynchronously verifies that the table exists and is compatible with record key schema, throwing if it is incompatible.<br/>
/// If the table is not present, it is provisioned, with the specified <c>throughput</c>.<br/>
/// If it is present, and the throughput is not as specified, uses <c>UpdateProvisionedThroughputAsync</c> to update it. <br/>
/// If the table is not present, it is provisioned, with the specified <c>throughput</c> and optionally <c>streaming</c>.<br/>
/// If it is present, and the <c>throughput</c> or <c>streaming</c> are not as specified, uses <c>UpdateTableAsync</c> to adjust.<br/>
/// </summary>
/// <param name="throughput">Throughput configuration to use for the table.</param>
member t.ProvisionTableAsync(throughput : Throughput) : Async<unit> = async {
let! tableDescription = t.InternalCreateOrValidateTableAsync(throughput)
match throughput with
| Provisioned p ->
let current = tableDescription.ProvisionedThroughput
if p.ReadCapacityUnits <> current.ReadCapacityUnits || p.WriteCapacityUnits <> current.WriteCapacityUnits then
do! t.UpdateProvisionedThroughputAsync p
| OnDemand -> () }
/// <param name="streaming">Optional streaming configuration to apply for the table. Default (if creating): Disabled. Default: (if existing) do not change.</param>
member t.ProvisionTableAsync(throughput : Throughput, ?streaming) : Async<unit> = async {
let! tableDescription = t.InternalCreateOrValidateTableAsync(throughput, defaultArg streaming Streaming.Disabled)
let tc = throughput |> Throughput.hasChanged tableDescription
let sc = streaming |> Option.exists (Streaming.hasChanged tableDescription)
if tc || sc then
let apply req =
if tc then Throughput.applyToUpdateRequest req throughput
if sc then streaming |> Option.iter (Streaming.applyToUpdateRequest req)
do! t.UpdateTableAsync(apply) }

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema.
Expand All @@ -1034,7 +1080,7 @@ type TableContext<'TRecord> internal
member t.VerifyTableAsync(?createIfNotExists : bool, ?provisionedThroughput : ProvisionedThroughput) : Async<unit> =
if createIfNotExists = Some true then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
t.InitializeTableAsync(Provisioned throughput)
t.InitializeTableAsync(Throughput.Provisioned throughput)
else
t.VerifyTableAsync()

Expand Down Expand Up @@ -1506,5 +1552,6 @@ module Scripting =
/// Updates the underlying table with supplied provisioned throughput.
/// </summary>
/// <param name="provisionedThroughput">Provisioned throughput to use on table.</param>
member __.UpdateProvisionedThroughput(provisionedThroughput : ProvisionedThroughput) =
__.UpdateProvisionedThroughputAsync(provisionedThroughput) |> Async.RunSynchronously
member t.UpdateProvisionedThroughput(provisionedThroughput : ProvisionedThroughput) =
let spec = Throughput.Provisioned provisionedThroughput
t.UpdateTableAsync(fun req -> Throughput.applyToUpdateRequest req spec) |> Async.RunSynchronously
2 changes: 1 addition & 1 deletion tests/FSharp.AWS.DynamoDB.Tests/Utils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ module Utils =

member _.CreateContextAndTableIfNotExists<'TRecord>() =
let throughput = Model.ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
Scripting.TableContext.Initialize<'TRecord>(client, tableName, Provisioned throughput)
Scripting.TableContext.Initialize<'TRecord>(client, tableName, Throughput.Provisioned throughput)

interface IDisposable with member _.Dispose() = client.DeleteTableAsync(tableName) |> Async.AwaitTask |> Async.RunSynchronously |> ignore

0 comments on commit 00cdea3

Please sign in to comment.