Skip to content

Commit

Permalink
Add OnDemand mode
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 29, 2022
1 parent 1cd3f68 commit 54cbdd6
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 34 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ open Amazon.DynamoDBv2
open FSharp.AWS.DynamoDB.Scripting // Expose non-Async methods, e.g. PutItem/GetItem
let client : IAmazonDynamoDB = ``your DynamoDB client instance``
let throughput = ProvisionedThroughput (readCapacityUnits = 1L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", throughput)
let throughput = ProvisionedThroughput(readCapacityUnits = 1L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", Provisioned throughput)
let workItem = { ProcessId = 0L ; WorkItemId = 1L ; Name = "Test" ; UUID = guid() ; Dependencies = set ["mscorlib"] ; Started = None }
Expand Down Expand Up @@ -120,8 +120,8 @@ type Counter private (table : TableContext<CounterEntry>, key : TableKey) =
static member Create(client : IAmazonDynamoDB, tableName : string) = async {
let table = TableContext<CounterEntry>(client, tableName)
let throughput = ProvisionedThroughput(readCapacityUnits = 100L, writeCapacityUnits = 100L)
do! table.InitializeTableAsync throughput
let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
do! table.InitializeTableAsync(Provisioned throughput)
let initialEntry = { Id = Guid.NewGuid() ; Value = 0L }
let! key = table.PutItemAsync(initialEntry)
return Counter(table, key)
Expand Down
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Added `TableContext.VerifyTableAsync` overload that only performs verification but never creates a Table
* Added `TableContext.InitializeTableAsync` (replaces `TableContext.VerifyTableAsync(createIfNotExists = true)`)
* Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateProvisionedThroughputAsync` if throughput has changed)
* Added Support for `Throughput.OnDemand` mode (sets `BillingMode` to `PAY_PER_REQUEST` rather than attempting to configure a `ProvisionedThroughput`)
* Removed `TableContext.CreateAsync` (replace with `TableContext.VerifyTableAsync` or `InitializeTableAsync`)

### 0.9.3-beta
Expand Down
5 changes: 1 addition & 4 deletions src/FSharp.AWS.DynamoDB/RecordKeySchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,10 @@ type TableKeySchemata with
yield! td.LocalSecondaryIndexes |> Seq.map mkLocalSecondaryIndex |])

/// Create a CreateTableRequest using supplied key schema
member schema.CreateCreateTableRequest (tableName : string, provisionedThroughput : ProvisionedThroughput) =
member schema.CreateCreateTableRequest(tableName : string) =
let ctr = CreateTableRequest(TableName = tableName)
let inline mkKSE n t = KeySchemaElement(n, t)

ctr.ProvisionedThroughput <- provisionedThroughput

let keyAttrs = new Dictionary<string, KeyAttributeSchema>()
for tks in schema.Schemata do
keyAttrs.[tks.HashKey.AttributeName] <- tks.HashKey
Expand All @@ -391,7 +389,6 @@ type TableKeySchemata with
gsi.KeySchema.Add <| mkKSE tks.HashKey.AttributeName KeyType.HASH
tks.RangeKey |> Option.iter (fun rk -> gsi.KeySchema.Add <| mkKSE rk.AttributeName KeyType.RANGE)
gsi.Projection <- Projection(ProjectionType = ProjectionType.ALL)
gsi.ProvisionedThroughput <- provisionedThroughput
ctr.GlobalSecondaryIndexes.Add gsi

| LocalSecondaryIndex name ->
Expand Down
16 changes: 9 additions & 7 deletions src/FSharp.AWS.DynamoDB/Script.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ type Test =
Bytes : byte[]
}

let autoCreate = InitializationMode.CreateIfNotExists (ProvisionedThroughput (100L, 100L))
let table = TableContext.Initialize<Test>(ddb, "test", mode = autoCreate)
let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<Test>(ddb, "test", Provisioned throughput)

let value = { HashKey = Guid.NewGuid() ; List = [] ; RangeKey = "2" ; Value = 3.1415926 ; Date = DateTimeOffset.Now + TimeSpan.FromDays 2. ; Value2 = None ; Values = [|{ A = "foo" ; B = System.Reflection.BindingFlags.Instance }|] ; Map = Map.ofList [("A1",1)] ; Set = [set [1L];set [2L]] ; Bytes = [|1uy..10uy|]; String = ref "1a" ; Unions = [A 42; B("42",3)]}

Expand Down Expand Up @@ -126,8 +126,9 @@ type EasyCounters private (table : TableContext<CounterEntry>) =
static member Create(client : IAmazonDynamoDB, tableName : string) : Async<EasyCounters> = async {
let table = TableContext<CounterEntry>(client, tableName)
// Create the table if necessary. Verifies schema is correct if it has already been created
let throughput = ProvisionedThroughput(readCapacityUnits = 100L, writeCapacityUnits = 100L)
do! table.InitializeTableAsync throughput
// NOTE the hard coded initial throughput provisioning - arguably this belongs outside of your application logic
let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
do! table.InitializeTableAsync(Provisioned throughput)
return EasyCounters(table)
}

Expand All @@ -141,7 +142,8 @@ type SimpleCounters private (table : TableContext<CounterEntry>) =
let table = TableContext<CounterEntry>(client, tableName)
// normally, RCU/WCU provisioning only happens first time the Table is created and is then considered an external concern
// here we use `ProvisionTableAsync` instead of `InitializeAsync` to reset it each time we start the app
table.ProvisionTableAsync(ProvisionedThroughput (readCapacityUnits, writeCapacityUnits))
let provisionedThroughput = ProvisionedThroughput(readCapacityUnits, writeCapacityUnits)
table.ProvisionTableAsync(Provisioned provisionedThroughput)

/// We only want to do the initialization bit once per instance of our application
/// Similar to EasyCounters.Create in that it ensures the table is provisioned correctly
Expand All @@ -168,11 +170,11 @@ let e2 = e.StartCounter() |> Async.RunSynchronously
e1.Incr() |> Async.RunSynchronously
e2.Incr() |> Async.RunSynchronously

SimpleCounters.Provision(ddb, "testing-pre-provisioned", 100L, 100L) |> Async.RunSynchronously
SimpleCounters.Provision(ddb, "testing-pre-provisioned", readCapacityUnits = 10L, writeCapacityUnits = 10L) |> Async.RunSynchronously
// The consuming code can assume the provisioning has been carried out as part of the deploy
// that allows the creation to be synchronous (and not impede application startup)
let s = SimpleCounters.Create(ddb, "testing-pre-provisioned")
let s1 = s.StartCounter() |> Async.RunSynchronously // Would throw if Provision has not been carried out
let s1 = s.StartCounter() |> Async.RunSynchronously // Throws if Provision step has not been executed
s1.Incr() |> Async.RunSynchronously

// Alternately, we can have the app do an extra call (and have some asynchronous initialization work) to check the table is ready
Expand Down
45 changes: 32 additions & 13 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ type ResourceNotFoundException = Amazon.DynamoDBv2.Model.ResourceNotFoundExcepti
/// Represents the provisioned throughput for given table or index
type ProvisionedThroughput = Amazon.DynamoDBv2.Model.ProvisionedThroughput

/// Represents the throughput configuration for a Table
type Throughput =
| Provisioned of ProvisionedThroughput
| OnDemand
module internal Throughput =
let applyToCreateRequest (req : CreateTableRequest) = function
| Provisioned t ->
req.ProvisionedThroughput <- t
for gsi in req.GlobalSecondaryIndexes do
gsi.ProvisionedThroughput <- t
| OnDemand ->
req.BillingMode <- BillingMode.PAY_PER_REQUEST

/// Represents the operation performed on the table, for metrics collection purposes
type Operation = GetItem | PutItem | UpdateItem | DeleteItem | BatchGetItems | BatchWriteItems | Scan | Query

Expand Down Expand Up @@ -973,10 +986,13 @@ type TableContext<'TRecord> internal

checkOrCreate 9 // up to 9 retries, i.e. 10 attempts before we let exception propagate

member internal _.InternalCreateTableRequest(throughput) =
template.Info.Schemata.CreateCreateTableRequest(tableName, throughput)
member internal t.InternalCreateOrValidateTableAsync(createThroughput) =
t.InternalProvision(fun () -> t.InternalCreateTableRequest(createThroughput))
member internal _.InternalCreateCreateTableRequest(applyThroughput) =
let req = template.Info.Schemata.CreateCreateTableRequest(tableName)
applyThroughput req
req

member internal t.InternalCreateOrValidateTableAsync(throughput) =
t.InternalProvision(fun () -> t.InternalCreateTableRequest(fun r -> Throughput.applyToCreateRequest r throughput))

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema, or throw.<br/>
Expand All @@ -990,21 +1006,24 @@ type TableContext<'TRecord> internal
/// If the table is not present, it is provisioned, with the specified <c>throughput</c>.<br/>
/// See also <c>VerifyTableAsync</c>, which only verifies the Table is present and correct.
/// </summary>
/// <param name="throughput">Provisioned throughput to use for the table.</param>
member t.InitializeTableAsync(throughput : ProvisionedThroughput) : Async<unit> =
/// <param name="throughput">Throughput configuration to use for the table.</param>
member t.InitializeTableAsync(throughput : Throughput) : Async<unit> =
t.InternalCreateOrValidateTableAsync(throughput) |> Async.Ignore

/// <summary>
/// Asynchronously verifies that the table exists and is compatible with record key schema, throwing if it is incompatible.<br/>
/// If the table is not present, it is provisioned, with the specified <c>throughput</c>.<br/>
/// If it is present, and the throughput is not as specified, uses <c>UpdateProvisionedThroughputAsync</c> to update it. <br/>
/// </summary>
/// <param name="throughput">Provisioned throughput to use for the table.</param>
member t.ProvisionTableAsync(throughput : ProvisionedThroughput) : Async<unit> = async {
/// <param name="throughput">Throughput configuration to use for the table.</param>
member t.ProvisionTableAsync(throughput : Throughput) : Async<unit> = async {
let! tableDescription = t.InternalCreateOrValidateTableAsync(throughput)
let provisioned = tableDescription.ProvisionedThroughput
if throughput.ReadCapacityUnits <> provisioned.ReadCapacityUnits || throughput.WriteCapacityUnits <> provisioned.WriteCapacityUnits then
do! t.UpdateProvisionedThroughputAsync(throughput) }
match throughput with
| Provisioned p ->
let current = tableDescription.ProvisionedThroughput
if p.ReadCapacityUnits <> current.ReadCapacityUnits || p.WriteCapacityUnits <> current.WriteCapacityUnits then
do! t.UpdateProvisionedThroughputAsync p
| OnDemand -> () }

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema.
Expand All @@ -1015,7 +1034,7 @@ type TableContext<'TRecord> internal
member t.VerifyTableAsync(?createIfNotExists : bool, ?provisionedThroughput : ProvisionedThroughput) : Async<unit> =
if createIfNotExists = Some true then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
t.InitializeTableAsync(throughput)
t.InitializeTableAsync(Provisioned throughput)
else
t.VerifyTableAsync()

Expand Down Expand Up @@ -1043,7 +1062,7 @@ type TableContext internal () =
let context = TableContext<'TRecord>(client, tableName, ?metricsCollector = metricsCollector)
if createIfNotExists = Some true then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
do! context.InitializeTableAsync throughput
do! context.InitializeTableAsync(Throughput.Provisioned throughput)
elif verifyTable <> Some false then
do! context.VerifyTableAsync()
return context }
Expand Down
10 changes: 4 additions & 6 deletions tests/FSharp.AWS.DynamoDB.Tests/Utils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ module Utils =
let client = getDynamoDBAccount()
let tableName = getRandomTableName()

member __.CreateContextAndTableIfNotExists<'TRecord>() =
let createThroughput = ProvisionedThroughput(10L, 10L)
Scripting.TableContext.Initialize<'TRecord>(client, tableName, createThroughput)
member _.CreateContextAndTableIfNotExists<'TRecord>() =
let throughput = Model.ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L)
Scripting.TableContext.Initialize<'TRecord>(client, tableName, Provisioned throughput)

interface IDisposable with
member __.Dispose() =
client.DeleteTableAsync(tableName) |> Async.AwaitTask |> Async.RunSynchronously |> ignore
interface IDisposable with member _.Dispose() = client.DeleteTableAsync(tableName) |> Async.AwaitTask |> Async.RunSynchronously |> ignore

0 comments on commit 54cbdd6

Please sign in to comment.