Skip to content

Commit

Permalink
Refactor InitializationMode to ProvisionTableAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 28, 2022
1 parent 6423028 commit a7ec460
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 86 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ open Amazon.DynamoDBv2
open FSharp.AWS.DynamoDB.Scripting // Expose non-Async methods, e.g. PutItem/GetItem
let client : IAmazonDynamoDB = ``your DynamoDB client instance``
let autoCreate = InitializationMode.CreateIfNotExists (ProvisionedThroughput(10, 10))
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", mode = autoCreate)
let throughput = ProvisionedThroughput (readCapacityUnits = 1L, writeCapacityUnits = 10L)
let table = TableContext.Initialize<WorkItemInfo>(client, tableName = "workItems", throughput)
let workItem = { ProcessId = 0L ; WorkItemId = 1L ; Name = "Test" ; UUID = guid() ; Dependencies = set ["mscorlib"] ; Started = None }
Expand Down Expand Up @@ -120,7 +120,8 @@ type Counter private (table : TableContext<CounterEntry>, key : TableKey) =
static member Create(client : IAmazonDynamoDB, tableName : string) = async {
let table = TableContext<CounterEntry>(client, tableName)
do! table.InitializeTableAsync( ProvisionedThroughput(100L, 100L))
let throughput = ProvisionedThroughput(readCapacityUnits = 100L, writeCapacityUnits = 100L)
do! table.InitializeTableAsync throughput
let initialEntry = { Id = Guid.NewGuid() ; Value = 0L }
let! key = table.PutItemAsync(initialEntry)
return Counter(table, key)
Expand Down
3 changes: 2 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
* Added `TableContext.Scripting.Initialize` (replaces `TableContext.Create()`)
* Added `TableContext.VerifyTableAsync` overload that only performs verification but never creates a Table
* Added `TableContext.InitializeTableAsync` (replaces `TableContext.VerifyTableAsync(createIfNotExists = true)`)
* Removed `TableContext.CreateAsync` (replace with `TableContext.VerifyTableAsync` or `TableContext.InitializeTableAsync`)
* Added `TableContext.ProvisionTableAsync` (as per `InitializeTableAsync` but does an `UpdateProvisionedThroughputAsync` if throughput has changed)
* Removed `TableContext.CreateAsync` (replace with `TableContext.VerifyTableAsync` or `InitializeTableAsync`)

### 0.9.3-beta
* Added `RequestMetrics` record type
Expand Down
12 changes: 9 additions & 3 deletions src/FSharp.AWS.DynamoDB/Script.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ type EasyCounters private (table : TableContext<CounterEntry>) =
static member Create(client : IAmazonDynamoDB, tableName : string) : Async<EasyCounters> = async {
let table = TableContext<CounterEntry>(client, tableName)
// Create the table if necessary. Verifies schema is correct if it has already been created
do! table.InitializeTableAsync( ProvisionedThroughput(100L, 100L))
let throughput = ProvisionedThroughput(readCapacityUnits = 100L, writeCapacityUnits = 100L)
do! table.InitializeTableAsync throughput
return EasyCounters(table)
}

Expand All @@ -139,8 +140,8 @@ type SimpleCounters private (table : TableContext<CounterEntry>) =
static member Provision(client : IAmazonDynamoDB, tableName : string, readCapacityUnits, writeCapacityUnits) : Async<unit> =
let table = TableContext<CounterEntry>(client, tableName)
// normally, RCU/WCU provisioning only happens first time the Table is created and is then considered an external concern
// here we use `updateThroughputIfExists = true` to reset it each time we start the app
table.InitializeTableAsync(ProvisionedThroughput (readCapacityUnits, writeCapacityUnits), updateThroughputIfExists = true)
// here we use `ProvisionTableAsync` instead of `InitializeAsync` to reset it each time we start the app
table.ProvisionTableAsync(ProvisionedThroughput (readCapacityUnits, writeCapacityUnits))

/// We only want to do the initialization bit once per instance of our application
/// Similar to EasyCounters.Create in that it ensures the table is provisioned correctly
Expand Down Expand Up @@ -168,10 +169,15 @@ e1.Incr() |> Async.RunSynchronously
e2.Incr() |> Async.RunSynchronously

SimpleCounters.Provision(ddb, "testing-pre-provisioned", 100L, 100L) |> Async.RunSynchronously
// The consuming code can assume the provisioning has been carried out as part of the deploy
// that allows the creation to be synchronous (and not impede application startup)
let s = SimpleCounters.Create(ddb, "testing-pre-provisioned")
let s1 = s.StartCounter() |> Async.RunSynchronously // Would throw if Provision has not been carried out
s1.Incr() |> Async.RunSynchronously

// Alternately, we can have the app do an extra call (and have some asynchronous initialization work) to check the table is ready
let v = SimpleCounters.CreateWithVerify(ddb, "testing-not-present") |> Async.RunSynchronously // Throws, as table not present
let v2 = v.StartCounter() |> Async.RunSynchronously
v2.Incr() |> Async.RunSynchronously

// (TOCONSIDER: Illustrate how to use AsyncCacheCell from https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncCacheCell.fs to make Verify call lazy)
137 changes: 62 additions & 75 deletions src/FSharp.AWS.DynamoDB/TableContext.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,6 @@ type private LimitType = All | Default | Count of int
static member AllOrCount (l : int option) = l |> Option.map Count |> Option.defaultValue All
static member DefaultOrCount (l : int option) = l |> Option.map Count |> Option.defaultValue Default

/// Defines Verification and/or Provisioning steps to be applied when initializing and/or validating a TableContext
[<RequireQualifiedAccess>]
type InitializationMode =
/// Validate the Table exists and has a compatible schema
| VerifyOnly
/// Perform a Validation step as per VerifyOnly, but Create the Table with the specified Throughput if it was not found
| CreateIfNotExists of provisionedThroughput : ProvisionedThroughput
/// Validate and/or Create as per CreateIfNotExists, but also re-apply the specified Throughput in case it has diverged
| CreateOrUpdateThroughput of provisionedThroughput : ProvisionedThroughput

/// DynamoDB client object for performing table operations in the context of given F# record representations
[<Sealed; AutoSerializable(false)>]
type TableContext<'TRecord> internal
Expand Down Expand Up @@ -915,69 +905,61 @@ type TableContext<'TRecord> internal
}


member __.InternalInitializeAsync(mode : InitializationMode) : Async<unit> = async {
member internal _.InternalDescribe() : Async<TableDescription> =
let rec wait () = async {
let! ct = Async.CancellationToken
let! td = client.DescribeTableAsync(tableName, ct) |> Async.AwaitTaskCorrect
if td.Table.TableStatus = TableStatus.ACTIVE then
do! Async.Sleep 2000
// wait indefinitely if table is in transition state
return! wait ()
else

return td.Table
}
wait ()

member internal __.CreateOrValidateTableAsync(createThroughput) : Async<TableDescription> =
let (|Conflict|_|) (e : exn) =
match e with
| :? AmazonDynamoDBException as e when e.StatusCode = HttpStatusCode.Conflict -> Some()
| :? ResourceInUseException -> Some ()
| _ -> None

let rec verify retries = async {
let! ct = Async.CancellationToken
let! response =
client.DescribeTableAsync(tableName, ct)
|> Async.AwaitTaskCorrect
|> Async.Catch

match response with
| Choice1Of2 td ->
if td.Table.TableStatus <> TableStatus.ACTIVE then
do! Async.Sleep 2000
// wait indefinitely if table is in transition state
return! verify retries
else

let existingSchema = TableKeySchemata.OfTableDescription td.Table
let rec checkOrCreate retries = async {
match! __.InternalDescribe() |> Async.Catch with
| Choice1Of2 desc ->
let existingSchema = TableKeySchemata.OfTableDescription desc
if existingSchema <> template.Info.Schemata then
sprintf "table '%s' exists with key schema %A, which is incompatible with record '%O'."
tableName existingSchema typeof<'TRecord>
|> invalidOp
match mode with
| InitializationMode.VerifyOnly | InitializationMode.CreateIfNotExists _ -> ()
| InitializationMode.CreateOrUpdateThroughput t ->
let provisioned = td.Table.ProvisionedThroughput
if t.ReadCapacityUnits <> provisioned.ReadCapacityUnits
|| t.WriteCapacityUnits <> provisioned.WriteCapacityUnits then
do! __.UpdateProvisionedThroughputAsync(t)

| Choice2Of2 (:? ResourceNotFoundException) when mode <> InitializationMode.VerifyOnly ->
let throughput =
match mode with
| InitializationMode.VerifyOnly -> failwith "Unexpected" // the when guard should preclude this
| InitializationMode.CreateIfNotExists t | InitializationMode.CreateOrUpdateThroughput t -> t
let ctr = template.Info.Schemata.CreateCreateTableRequest(tableName, throughput)
return desc

| Choice2Of2 (:? ResourceNotFoundException) when Option.isSome createThroughput ->
let! ct = Async.CancellationToken
let ctr = template.Info.Schemata.CreateCreateTableRequest(tableName, Option.get createThroughput)
let! response =
client.CreateTableAsync(ctr, ct)
|> Async.AwaitTaskCorrect
|> Async.Catch

match response with
| Choice1Of2 _ -> return! verify retries
| Choice1Of2 _ -> return! checkOrCreate retries
| Choice2Of2 Conflict when retries > 0 ->
do! Async.Sleep 2000
return! verify (retries - 1)
return! checkOrCreate (retries - 1)

| Choice2Of2 e -> do! Async.Raise e
| Choice2Of2 e -> return! Async.Raise e

| Choice2Of2 Conflict when retries > 0 ->
do! Async.Sleep 2000
return! verify (retries - 1)
return! checkOrCreate (retries - 1)

| Choice2Of2 e -> do! Async.Raise e
| Choice2Of2 e -> return! Async.Raise e
}

do! verify 9 // up to 9 retries, i.e. 10 attempts before we let exception propagate
}
checkOrCreate 9 // up to 9 retries, i.e. 10 attempts before we let exception propagate

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema.
Expand All @@ -986,33 +968,41 @@ type TableContext<'TRecord> internal
/// <param name="provisionedThroughput">Provisioned throughput for the table if newly created. Defaults to (10,10).</param>
[<System.Obsolete("Please replace with either 1. VerifyTableAsync or 2. InitializeTableAsync")>]
member __.VerifyTableAsync(?createIfNotExists : bool, ?provisionedThroughput : ProvisionedThroughput) : Async<unit> =
let mode =
let throughputIfCreate =
if createIfNotExists = Some true then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
InitializationMode.CreateIfNotExists throughput
else InitializationMode.VerifyOnly
__.InternalInitializeAsync(mode)
Some throughput
else None
__.CreateOrValidateTableAsync(throughputIfCreate) |> Async.Ignore

/// <summary>
/// Asynchronously verify that the table exists and is compatible with record key schema, or throw.<br/>
/// See also <c>InitializeTableAsync</c>, which performs the same check, but can create or re-provision the Table if required.
/// </summary>
member __.VerifyTableAsync() : Async<unit> =
__.InternalInitializeAsync(InitializationMode.VerifyOnly)
__.CreateOrValidateTableAsync(None) |> Async.Ignore

/// <summary>
/// Asynchronously verifies that the table exists and is compatible with record key schema, throwing if it is incompatible.<br/>
/// If the table is not present, it is provisioned, with the specified <c>throughput</c>.<br/>
/// Optionally can reapply the cited <c>throughput</c> level if the Table has already been created.<br/>
/// See also <c>VerifyTableAsync</c>, which only verifies the Table is present and correct.
/// </summary>
/// <param name="throughput">Provisioned throughput to use for the table.</param>
/// <param name="updateThroughputIfExists">Trigger re-provisioning of the Table's throughput if it was already present. Default: <c>false</c></param>
member __.InitializeTableAsync(throughput : ProvisionedThroughput, ?updateThroughputIfExists) : Async<unit> =
let mode =
if updateThroughputIfExists = Some true then InitializationMode.CreateOrUpdateThroughput
else InitializationMode.CreateIfNotExists
__.InternalInitializeAsync(mode throughput)
member __.InitializeTableAsync(throughput : ProvisionedThroughput) : Async<unit> =
__.CreateOrValidateTableAsync(Some throughput) |> Async.Ignore

/// <summary>
/// Asynchronously verifies that the table exists and is compatible with record key schema, throwing if it is incompatible.<br/>
/// If the table is not present, it is provisioned, with the specified <c>throughput</c>.<br/>
/// If it is present, and the throughput is not as specified, uses <c>UpdateProvisionedThroughputAsync</c> to update it. <br/>
/// </summary>
/// <param name="throughput">Provisioned throughput to use for the table.</param>
member __.ProvisionTableAsync(throughput : ProvisionedThroughput) : Async<unit> = async {
let! desc = __.CreateOrValidateTableAsync(Some throughput)
let provisioned = desc.ProvisionedThroughput
if throughput.ReadCapacityUnits <> provisioned.ReadCapacityUnits
|| throughput.WriteCapacityUnits <> provisioned.WriteCapacityUnits then
do! __.UpdateProvisionedThroughputAsync(throughput) }

// Deprecated factory method, to be removed. Replaced with
// 1. TableContext<'T> ctor (synchronous)
Expand All @@ -1034,17 +1024,14 @@ type TableContext internal () =
static member Create<'TRecord>
( client : IAmazonDynamoDB, tableName : string, ?verifyTable : bool,
?createIfNotExists : bool, ?provisionedThroughput : ProvisionedThroughput,
?metricsCollector : RequestMetrics -> unit) =
?metricsCollector : RequestMetrics -> unit) = async {
let context = TableContext<'TRecord>(client, tableName, ?metricsCollector = metricsCollector)
let verifyTable, createIfNotExists = verifyTable <> Some false, createIfNotExists = Some true
if verifyTable || createIfNotExists then
let mode =
if createIfNotExists then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
InitializationMode.CreateIfNotExists throughput
else InitializationMode.VerifyOnly
context.InternalInitializeAsync(mode) |> Async.RunSynchronously
context
if createIfNotExists = Some true then
let throughput = match provisionedThroughput with Some p -> p | None -> ProvisionedThroughput(10L, 10L)
do! context.InitializeTableAsync throughput
elif verifyTable <> Some false then
do! context.VerifyTableAsync()
return context }

/// <summary>
/// Sync-over-Async helpers that can be opted-into when working in scripting scenarios.
Expand All @@ -1059,15 +1046,15 @@ module Scripting =
/// <summary>Creates a DynamoDB client instance for the specified F# record type, client and table name.</summary>
/// <param name="client">DynamoDB client instance.</param>
/// <param name="tableName">Table name to target.</param>
/// <param name="mode">Allows one to define auto-creation or re-provisioning rules via <see cref="InitializationMode" />.</param>
/// <param name="throughput">Optional throughput to configure if the Table does not yet exist.</param>
/// <param name="metricsCollector">Function to receive request metrics.</param>
static member Initialize<'TRecord>
( client : IAmazonDynamoDB, tableName : string,
?mode : InitializationMode,
( client : IAmazonDynamoDB, tableName : string, ?throughput,
?metricsCollector : RequestMetrics -> unit) : TableContext<'TRecord> =
let context = TableContext<'TRecord>(client, tableName, ?metricsCollector = metricsCollector)
let mode = defaultArg mode InitializationMode.VerifyOnly
context.InternalInitializeAsync(mode) |> Async.RunSynchronously
match throughput with
| None -> context.VerifyTableAsync() |> Async.RunSynchronously
| Some t -> context.InitializeTableAsync(t) |> Async.RunSynchronously
context

type TableContext<'TRecord> with
Expand Down
6 changes: 2 additions & 4 deletions tests/FSharp.AWS.DynamoDB.Tests/Utils.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ module Utils =
type TableFixture() =
let client = getDynamoDBAccount()
let tableName = getRandomTableName()
member __.Client = client
member __.TableName = tableName

member __.CreateContextAndTableIfNotExists<'TRecord>() =
let autoCreate = InitializationMode.CreateIfNotExists (ProvisionedThroughput(10L, 10L))
Scripting.TableContext.Initialize<'TRecord>(__.Client, __.TableName, mode = autoCreate)
let createThroughput = ProvisionedThroughput(10L, 10L)
Scripting.TableContext.Initialize<'TRecord>(client, tableName, createThroughput)

interface IDisposable with
member __.Dispose() =
Expand Down

0 comments on commit a7ec460

Please sign in to comment.