From fc258a42fd6d73e905054f1a1ed68f3d6d9b6630 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 13 Feb 2024 06:55:40 +0800 Subject: [PATCH 1/7] wip Streams support --- src/FSharp.AWS.DynamoDB/TableContext.fs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/FSharp.AWS.DynamoDB/TableContext.fs b/src/FSharp.AWS.DynamoDB/TableContext.fs index e18a896..74cae4a 100644 --- a/src/FSharp.AWS.DynamoDB/TableContext.fs +++ b/src/FSharp.AWS.DynamoDB/TableContext.fs @@ -212,6 +212,11 @@ type RequestMetrics = ConsumedCapacity: ConsumedCapacity list ItemCount: int } +type StreamRecord<'TRecord> = + | Insert of 'TRecord + | Modify of {| Old: 'TRecord; New : 'TRecord |} + | Remove of 'TRecord + /// Scan/query limit type (internal only) type private LimitType = | All @@ -2096,3 +2101,14 @@ module Scripting = member t.UpdateProvisionedThroughput(provisionedThroughput: ProvisionedThroughput) : unit = let spec = Throughput.Provisioned provisionedThroughput t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously + + + member t.ParseStreamRecord(record : Record) : StreamRecord<'TRecord> = + match record.EventName with + | n when n = OperationType.INSERT -> + Insert (t.Template.OfAttributeValues record.Dynamodb.NewImage) + | n when n = OperationType.MODIFY -> + Modify {| New = t.Template.OfAttributeValues record.Dynamodb.NewImage; Old = t.Template.OfAttributeValues record.Dynamodb.NewImage |} + | n when n = OperationType.REMOVE -> + Remove (t.Template.OfAttributeValues record.Dynamodb.OldImage) + | n -> failwithf "Unexpected OperationType %s" n.Value From 7d73b823c95b1772afd50105833cae0b0485861b Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 20 Jun 2024 17:25:03 +0800 Subject: [PATCH 2/7] Split out StreamContext, implement more shard handling --- paket.dependencies | 2 +- paket.lock | 57 ++- .../FSharp.AWS.DynamoDB.fsproj | 4 +- src/FSharp.AWS.DynamoDB/StreamContext.fs | 347 ++++++++++++++++++ src/FSharp.AWS.DynamoDB/TableContext.fs | 39 +- .../FSharp.AWS.DynamoDB.Tests.fsproj | 7 +- .../StreamingTests.fs | 47 +++ tests/FSharp.AWS.DynamoDB.Tests/Utils.fs | 11 + 8 files changed, 460 insertions(+), 54 deletions(-) create mode 100644 src/FSharp.AWS.DynamoDB/StreamContext.fs create mode 100644 tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs diff --git a/paket.dependencies b/paket.dependencies index 99c724f..311bf71 100644 --- a/paket.dependencies +++ b/paket.dependencies @@ -4,7 +4,7 @@ storage: none framework: net80, netstandard20, netstandard21 nuget Unquote ~> 6.1.0 -nuget FSharp.Core >= 4.7.2 lowest_matching: true +nuget FSharp.Core >= 6.0.7 lowest_matching: true nuget AWSSDK.DynamoDBv2 ~> 3.7.5 nuget DotNet.ReproducibleBuilds diff --git a/paket.lock b/paket.lock index 2e93481..2d430ae 100644 --- a/paket.lock +++ b/paket.lock @@ -2,16 +2,16 @@ STORAGE: NONE RESTRICTION: || (== net8.0) (== netstandard2.0) (== netstandard2.1) NUGET remote: https://api.nuget.org/v3/index.json - AWSSDK.Core (3.7.300.11) + AWSSDK.Core (3.7.303.5) Microsoft.Bcl.AsyncInterfaces (>= 1.1) - restriction: || (&& (== net8.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) - AWSSDK.DynamoDBv2 (3.7.300.11) - AWSSDK.Core (>= 3.7.300.11 < 4.0) + AWSSDK.DynamoDBv2 (3.7.302.6) + AWSSDK.Core (>= 3.7.303.5 < 4.0) DotNet.ReproducibleBuilds (1.1.1) Microsoft.SourceLink.AzureRepos.Git (>= 1.1.1) Microsoft.SourceLink.Bitbucket.Git (>= 1.1.1) Microsoft.SourceLink.GitHub (>= 1.1.1) Microsoft.SourceLink.GitLab (>= 1.1.1) - FSharp.Core (4.7.2) + FSharp.Core (6.0.7) Microsoft.Bcl.AsyncInterfaces (8.0) - restriction: || (&& (== net8.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) System.Threading.Tasks.Extensions (>= 4.5.4) - restriction: || (&& (== net8.0) (>= net462)) (&& (== net8.0) (< netstandard2.1)) (== netstandard2.0) (&& (== netstandard2.1) (>= net462)) Microsoft.Build.Tasks.Git (8.0) @@ -42,39 +42,32 @@ NUGET remote: https://api.nuget.org/v3/index.json FsCheck (2.16.6) FSharp.Core (>= 4.2.3) - FSharp.Core (8.0.100) - Microsoft.CodeCoverage (17.8) - Microsoft.NET.Test.Sdk (17.8) - Microsoft.CodeCoverage (>= 17.8) - Microsoft.TestPlatform.TestHost (>= 17.8) - Microsoft.NETCore.Platforms (7.0.4) - Microsoft.TestPlatform.ObjectModel (17.8) - NuGet.Frameworks (>= 6.5) + FSharp.Core (8.0.200) + Microsoft.CodeCoverage (17.9) + Microsoft.NET.Test.Sdk (17.9) + Microsoft.CodeCoverage (>= 17.9) + Microsoft.TestPlatform.TestHost (>= 17.9) + Microsoft.TestPlatform.ObjectModel (17.9) System.Reflection.Metadata (>= 1.6) - Microsoft.TestPlatform.TestHost (17.8) - Microsoft.TestPlatform.ObjectModel (>= 17.8) + Microsoft.TestPlatform.TestHost (17.9) + Microsoft.TestPlatform.ObjectModel (>= 17.9) Newtonsoft.Json (>= 13.0.1) - NETStandard.Library (2.0.3) - Microsoft.NETCore.Platforms (>= 1.1) Newtonsoft.Json (13.0.3) - NuGet.Frameworks (6.8) System.Collections.Immutable (8.0) System.Reflection.Metadata (8.0) System.Collections.Immutable (>= 8.0) - xunit (2.6.2) - xunit.analyzers (>= 1.6) - xunit.assert (>= 2.6.2) - xunit.core (2.6.2) + xunit (2.7) + xunit.analyzers (>= 1.11) + xunit.assert (>= 2.7) + xunit.core (2.7) xunit.abstractions (2.0.3) - xunit.analyzers (1.6) - xunit.assert (2.6.2) - xunit.core (2.6.2) - xunit.extensibility.core (2.6.2) - xunit.extensibility.execution (2.6.2) - xunit.extensibility.core (2.6.2) - NETStandard.Library (>= 1.6.1) + xunit.analyzers (1.11) + xunit.assert (2.7) + xunit.core (2.7) + xunit.extensibility.core (2.7) + xunit.extensibility.execution (2.7) + xunit.extensibility.core (2.7) xunit.abstractions (>= 2.0.3) - xunit.extensibility.execution (2.6.2) - NETStandard.Library (>= 1.6.1) - xunit.extensibility.core (2.6.2) - xunit.runner.visualstudio (2.5.4) + xunit.extensibility.execution (2.7) + xunit.extensibility.core (2.7) + xunit.runner.visualstudio (2.5.7) diff --git a/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj b/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj index 80059bf..41c9952 100644 --- a/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj +++ b/src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj @@ -1,4 +1,3 @@ - netstandard2.0 @@ -37,9 +36,10 @@ + - + \ No newline at end of file diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs new file mode 100644 index 0000000..5f9ca72 --- /dev/null +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -0,0 +1,347 @@ +namespace FSharp.AWS.DynamoDB + +open System +open System.Collections.Concurrent +open System.Threading +open System.Threading.Tasks + +open Amazon.DynamoDBv2 +open Amazon.DynamoDBv2.Model +open System.Collections.Generic + +type StreamOperation = + | Insert + | Modify + | Remove + +type StreamRecord<'TRecord> = + { TableKey: TableKey + Operation: StreamOperation + ApproximateCreationDateTime: DateTimeOffset + New: 'TRecord option + Old: 'TRecord option } + +type StreamPosition = { ShardId: string; SequenceNumber: string option } + +type StreamReadFrom = + | Oldest + | Newest + | Position of StreamPosition + +type private Enumerator<'TRecord>(initialIterator : string, fetchNext: string -> Task<('TRecord [] * string option)>) = + let mutable iterator = initialIterator + let mutable records: 'TRecord [] = Array.empty + let mutable index = 0 + + interface IAsyncEnumerator<'TRecord> with + member _.Current = records[index] + + member _.MoveNextAsync() = + if index < records.Length - 1 then + index <- index + 1 + ValueTask(true) + else + let nextTask = + (fetchNext iterator) + .ContinueWith((fun (task: Task<('TRecord [] * string option)>) -> + match task.Result with + | (_, None) -> + records <- Array.empty + index <- 0 + false + | (recs, Some nextIterator) -> + records <- recs + iterator <- nextIterator + index <- 0 + true + ), TaskContinuationOptions.OnlyOnRanToCompletion) + ValueTask(nextTask) + + member _.DisposeAsync() = ValueTask() + +type private ShardTree = + | Leaf of Shard + | Node of Shard * ShardTree list + +module private ShardTree = + let shard (tree : ShardTree) = + match tree with + | Leaf shard | Node (shard, _) -> shard + + let find (shardId : string) (tree : ShardTree) = + let rec findInner t = + match t with + | Leaf s | Node (s, _) when s.ShardId = shardId -> Some(t) + | Node (_, children) -> children |> Seq.choose (findInner) |> Seq.tryHead + | Leaf _ -> None + + findInner tree + + let nextShards (shardId : string) (trees : ShardTree list) = + match trees |> List.choose (find shardId) |> List.tryHead with + | Some (Node (_, children)) -> children + | _ -> [] + +type private ShardMsg = + | UpdatedShardPosition of StreamPosition + | EndOfShard of string + | ShardsUpdated of ShardTree list + +type private StreamState = { Trees: ShardTree list; Workers: Map * StreamPosition> } + + +/// DynamoDB client object for intrepreting Dynamo streams sharing the same F# record representations as the `TableContext` +[] +type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName: string, template: RecordTemplate<'TRecord>) = + + [] + let idleTimeBetweenReadsMilliseconds = 1000 + [] + let maxRecords = 1000 + [] + let listShardsCacheAgeMilliseconds = 10000 + + let listStreams (ct: CancellationToken) : Task = + task { + let req = ListStreamsRequest(TableName = tableName, Limit = 100) + let! response = client.ListStreamsAsync(req, ct) + return response.Streams |> Seq.map _.StreamArn |> Seq.toList + } + + let describeStream (streamArn: string) (ct: CancellationToken) = + task { + let! response = client.DescribeStreamAsync(DescribeStreamRequest(StreamArn = streamArn), ct) + return response.StreamDescription + } + + /// + /// Returns the 'oldest' shard in a stream, according to the logic: + /// First shard which has no parent ID + /// First shard which has a parent ID not returned in the description + /// First shard in the list + /// + /// `StreamDescription` returned for the stream ARN + let oldestShard (stream: StreamDescription) = + stream.Shards + |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.ParentShardId)) + |> Option.orElseWith (fun () -> + stream.Shards + |> Seq.tryFind (fun s1 -> not (stream.Shards |> Seq.exists (fun s2 -> s1.ParentShardId = s2.ShardId)))) + |> Option.orElseWith (fun () -> stream.Shards |> Seq.tryHead) + + /// + /// Returns the 'newest' shard in a stream, according to the logic: + /// First shard which has no child IDs + /// First shard which has an empty `EndingSequenceNumber` (is still being actively written to) + /// First shard in the list + /// + /// `StreamDescription` returned for the stream ARN + // TODO: Should this ONLY return active streams? + let newestShard (stream: StreamDescription) = + stream.Shards + |> Seq.tryFind (fun s1 -> not (stream.Shards |> Seq.exists (fun s2 -> s1.ShardId = s2.ParentShardId))) + |> Option.orElseWith (fun () -> + stream.Shards + |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.SequenceNumberRange.EndingSequenceNumber))) + |> Option.orElseWith (fun () -> stream.Shards |> Seq.tryHead) + + /// + /// Returns all of the currently open shards in the stream + /// + /// `StreamDescription` returned for the stream ARN + let openShards (stream: StreamDescription) = + stream.Shards |> Seq.filter (fun s -> String.IsNullOrEmpty(s.ParentShardId)) + + /// + /// Returns all of the root shards (with no parent) in the stream + /// + /// `StreamDescription` returned for the stream ARN + let rootShards (stream : StreamDescription) = + stream.Shards |> Seq.filter (fun s -> + String.IsNullOrEmpty(s.ParentShardId) + || not (stream.Shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) + + /// + /// Build trees of the shard hierarchy in the stream. + /// Nodes will usually have a single child, but can have multiple children if the shard has been split. + /// + /// `StreamDescription` returned for the stream ARN + let shardTrees (stream: StreamDescription): ShardTree seq = + let children = + stream.Shards + |> Seq.filter (fun s -> not (String.IsNullOrEmpty(s.ParentShardId))) + |> Seq.groupBy _.ParentShardId + |> Map.ofSeq + + let rec buildTree (shard: Shard) = + match children |> Map.tryFind shard.ShardId with + | None -> Leaf shard + | Some cs -> Node(shard, cs |> Seq.map buildTree |> Seq.toList) + + rootShards stream |> Seq.map buildTree + + + + // TODO: what does this return if no more records in shard? + let getShardIterator (streamArn: string) (iteratorType: ShardIteratorType) (position: StreamPosition) (ct: CancellationToken) = + task { + let req = GetShardIteratorRequest( + StreamArn = streamArn, + ShardId = position.ShardId, + ShardIteratorType = iteratorType) + position.SequenceNumber |> Option.iter (fun n -> req.SequenceNumber <- n) + let! response = client.GetShardIteratorAsync(req, ct) + return response.ShardIterator + } + + let getRecords (iterator: string) (limit: int) (ct: CancellationToken) = + task { + let! response = client.GetRecordsAsync(GetRecordsRequest(Limit = limit, ShardIterator = iterator), ct) + return response.Records |> Array.ofSeq, if isNull response.NextShardIterator then None else Some response.NextShardIterator + } + + let getRecordsAsync (iterator: string) (ct: CancellationToken) = + let batchSize = 5 + { new IAsyncEnumerable with + member _.GetAsyncEnumerator(ct) = Enumerator(iterator, (fun i -> getRecords i batchSize ct)) :> IAsyncEnumerator } + + let startShardProcessor (outbox: MailboxProcessor) (streamArn : string) (position : StreamPosition) (processRecord: Record -> unit) (ct: CancellationToken) = + MailboxProcessor.Start((fun _ -> + let rec loop iterator = + async { + if not ct.IsCancellationRequested then + // TODO: Handle AWS read failures + let! (recs, nextIterator) = getRecords iterator maxRecords ct |> Async.AwaitTaskCorrect + // TODO: Process Record error handling + recs |> Array.iter processRecord + match nextIterator with + | None -> + outbox.Post(EndOfShard position.ShardId) + // End processing the shard + | Some i -> + outbox.Post(UpdatedShardPosition { ShardId = position.ShardId; SequenceNumber = Some i }) + do! Async.Sleep idleTimeBetweenReadsMilliseconds + do! loop i + } + + async { + let! initialIterator = getShardIterator streamArn ShardIteratorType.TRIM_HORIZON position ct |> Async.AwaitTaskCorrect + do! loop initialIterator + while not ct.IsCancellationRequested do + do! Async.Sleep idleTimeBetweenReadsMilliseconds + } + ), ct) + + let startStreamProcessor (streamArn : string) (processRecord: Record -> unit) (ct: CancellationToken) = + MailboxProcessor.Start(fun inbox -> + let startWorker (shard : Shard) = + let position = { ShardId = shard.ShardId; SequenceNumber = None } + (shard.ShardId, (startShardProcessor inbox streamArn position processRecord ct, position)) + + let rec loop (state: StreamState) = + async { + if not ct.IsCancellationRequested then + let! msg = inbox.Receive() + match msg with + | UpdatedShardPosition position -> + printfn "Recording current shard position: %0A" position + do! loop { state with Workers = state.Workers |> Map.change position.ShardId (Option.map (fun (p, _) -> (p, position))) } + | EndOfShard shardId -> + let newWorkers = ShardTree.nextShards shardId state.Trees |> List.map (ShardTree.shard >> startWorker) + do! loop { state with Workers = state.Workers |> Map.remove shardId |> Map.toList |> List.append newWorkers |> Map.ofList} + | ShardsUpdated trees -> + do! loop { state with Trees = trees } // TODO: Start new workers if necessary? + } + async { + let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect + let trees = shardTrees stream |> Seq.toList + // Start workers at the root shards. + // TODO: Handle LATEST semantics or starting from a checkpoint + let workers = trees |> Seq.map (ShardTree.shard >> startWorker) |> Map.ofSeq + do! loop { Trees = trees; Workers = workers } + } + ) + + /// DynamoDB streaming client instance used for the table operations + member _.Client = client + /// DynamoDB table name targeted by the context + member _.TableName = tableName + /// Record-induced table template + member _.Template = template + + + /// + /// Creates a DynamoDB client instance for given F# record and table name.
+ /// For creating, provisioning or verification, see VerifyOrCreateTableAsync and VerifyTableAsync. + ///
+ /// DynamoDB streaming client instance. + /// Table name to target. + new(client: IAmazonDynamoDBStreams, tableName: string) = + if not <| isValidTableName tableName then + invalidArg "tableName" "unsupported DynamoDB table name." + StreamContext<'TRecord>(client, tableName, RecordTemplate.Define<'TRecord>()) + + /// + /// Lists all the streams associated with the table. + /// + /// Cancellation token for the operation. + member _.ListStreamsAsync(?ct : CancellationToken) = + listStreams(ct |> Option.defaultValue CancellationToken.None) + + /// + /// Parses a DynamoDB stream record into the corresponding F# type (matching the table definition). + /// Intended to be used directly from Lambda event handlers or Kinesis consumers. + /// + /// DynamoDB stream record to parse. + /// `StreamRecord` with old value, new value, and/or `TableKey`, depending on stream config & operation + member t.ParseStreamRecord(record: Record) : StreamRecord<'TRecord> = + let op = + match record.EventName with + | n when n = OperationType.INSERT -> Insert + | n when n = OperationType.MODIFY -> Modify + | n when n = OperationType.REMOVE -> Remove + | n -> failwithf "Unexpected OperationType %s" n.Value + let key = t.Template.ExtractKey record.Dynamodb.Keys + let newRec = + if record.Dynamodb.NewImage.Count = 0 then + None + else + Some(t.Template.OfAttributeValues record.Dynamodb.NewImage) + let oldRec = + if record.Dynamodb.OldImage.Count = 0 then + None + else + Some(t.Template.OfAttributeValues record.Dynamodb.OldImage) + { Operation = op + TableKey = key + ApproximateCreationDateTime = DateTimeOffset(record.Dynamodb.ApproximateCreationDateTime) + New = newRec + Old = oldRec } + + member t.TempReadAllRecords() = + task { + let! streams = listStreams CancellationToken.None + let! stream = describeStream (streams |> List.head) CancellationToken.None + match oldestShard stream with + | Some shard -> + let! iterator = getShardIterator stream.StreamArn ShardIteratorType.TRIM_HORIZON { ShardId = shard.ShardId; SequenceNumber = None } CancellationToken.None + let! records = getRecords iterator maxRecords CancellationToken.None + return records |> fst |> Array.map t.ParseStreamRecord + | None -> + return [||] + } + + + // member t.StartReadingAsync(streamArn: string, processRecord: StreamRecord -> unit, ?ct: CancellationToken) = + // task { + // let! stream = describeStream streamArn (ct |> Option.defaultValue CancellationToken.None) + // let shard = oldestShard stream |> Option.get + // let! iterator = getShardIterator streamArn ShardIteratorType.TRIM_HORIZON { ShardId = shard.ShardId; SequenceNumber = None } ct + // let rec readLoop (iterator: string) = + // let! (recs, nextIterator) = getRecords iterator 1000 ct + // recs |> Array.iter (fun r -> processRecord (t.ParseStreamRecord r)) + // match nextIterator with + // | None -> () + // | Some i -> readLoop i + // readLoop iterator + // } \ No newline at end of file diff --git a/src/FSharp.AWS.DynamoDB/TableContext.fs b/src/FSharp.AWS.DynamoDB/TableContext.fs index 74cae4a..0102871 100644 --- a/src/FSharp.AWS.DynamoDB/TableContext.fs +++ b/src/FSharp.AWS.DynamoDB/TableContext.fs @@ -2,14 +2,18 @@ open System.Collections.Generic open System.Net +open System.Threading +open System.Threading.Tasks open Microsoft.FSharp.Quotations +open FSharp.Control open Amazon.DynamoDBv2 open Amazon.DynamoDBv2.Model open FSharp.AWS.DynamoDB.ExprCommon + /// Exception raised by DynamoDB in case where write preconditions are not satisfied type ConditionalCheckFailedException = Amazon.DynamoDBv2.Model.ConditionalCheckFailedException @@ -212,11 +216,6 @@ type RequestMetrics = ConsumedCapacity: ConsumedCapacity list ItemCount: int } -type StreamRecord<'TRecord> = - | Insert of 'TRecord - | Modify of {| Old: 'TRecord; New : 'TRecord |} - | Remove of 'TRecord - /// Scan/query limit type (internal only) type private LimitType = | All @@ -306,7 +305,12 @@ module Precondition = [] type TableContext<'TRecord> internal - (client: IAmazonDynamoDB, tableName: string, template: RecordTemplate<'TRecord>, metricsCollector: (RequestMetrics -> unit) option) = + ( + client: IAmazonDynamoDB, + tableName: string, + template: RecordTemplate<'TRecord>, + metricsCollector: (RequestMetrics -> unit) option + ) = let reportMetrics collector (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) = collector @@ -541,6 +545,7 @@ type TableContext<'TRecord> return downloaded } + /// DynamoDB client instance used for the table operations member _.Client = client /// DynamoDB table name targeted by the context @@ -1527,6 +1532,18 @@ module Scripting = let _desc = context.VerifyOrCreateTableAsync(throughput) |> Async.RunSynchronously context + /// Creates a DynamoDB client instance for the specified F# record type, client and table name.
+ /// Either validates the table exists and has the correct schema, or creates a fresh one, as per VerifyOrCreateTableAsync.
+ /// See other overload for VerifyTableAsync semantics. + /// DynamoDB client instance. + /// Table name to target. + /// Throughput to configure if the table does not yet exist. + /// Streaming configuration applied if the table does not yet exist. + static member Initialize<'TRecord>(client: IAmazonDynamoDB, tableName: string, throughput, streaming) : TableContext<'TRecord> = + let context = TableContext<'TRecord>(client, tableName) + let _desc = context.VerifyOrCreateTableAsync(throughput, streaming) |> Async.RunSynchronously + context + type TableContext<'TRecord> with /// @@ -2102,13 +2119,3 @@ module Scripting = let spec = Throughput.Provisioned provisionedThroughput t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously - - member t.ParseStreamRecord(record : Record) : StreamRecord<'TRecord> = - match record.EventName with - | n when n = OperationType.INSERT -> - Insert (t.Template.OfAttributeValues record.Dynamodb.NewImage) - | n when n = OperationType.MODIFY -> - Modify {| New = t.Template.OfAttributeValues record.Dynamodb.NewImage; Old = t.Template.OfAttributeValues record.Dynamodb.NewImage |} - | n when n = OperationType.REMOVE -> - Remove (t.Template.OfAttributeValues record.Dynamodb.OldImage) - | n -> failwithf "Unexpected OperationType %s" n.Value diff --git a/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj b/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj index 3c973f3..bea265b 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj +++ b/tests/FSharp.AWS.DynamoDB.Tests/FSharp.AWS.DynamoDB.Tests.fsproj @@ -1,7 +1,7 @@ - + net8.0 - + @@ -17,6 +17,7 @@ + - + \ No newline at end of file diff --git a/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs new file mode 100644 index 0000000..55fab7d --- /dev/null +++ b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs @@ -0,0 +1,47 @@ +namespace FSharp.AWS.DynamoDB.Tests + +open System + +open Swensen.Unquote +open Xunit + +open FSharp.AWS.DynamoDB +open FSharp.AWS.DynamoDB.Scripting +open Amazon.DynamoDBv2 + +[] +module StreamingTests = + + type StreamingRecord = + { [] + HashKey: string + [] + RangeKey: string + StringValue: string + IntValue: int64 } + +type ``Streaming Tests``(fixture: TableFixture) = + + let rand = let r = Random.Shared in fun () -> int64 <| r.Next() + let mkItem () = + { HashKey = guid () + RangeKey = guid () + StringValue = guid () + IntValue = rand () } + + let (table, streams) = fixture.CreateEmptyWithStreaming(StreamViewType.NEW_AND_OLD_IMAGES) + + [] + let ``Parse New Item from stream`` () = + let value = mkItem () + let key = table.PutItem value + let value' = table.GetItem key + let task = streams.TempReadAllRecords() + task.Wait() + let records = task.Result + test <@ 1 = Array.length records @> + test <@ records[0].Operation = Insert @> + test <@ records[0].Old = None @> + test <@ records[0].New = Some value' @> + + interface IClassFixture \ No newline at end of file diff --git a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs index a10d59a..1232698 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/Utils.fs @@ -27,6 +27,11 @@ module Utils = new AmazonDynamoDBClient(credentials, config) :> IAmazonDynamoDB + let getStreamsAccount () = + let credentials = BasicAWSCredentials("Fake", "Fake") + let config = AmazonDynamoDBStreamsConfig(ServiceURL = "http://localhost:8000") + + new AmazonDynamoDBStreamsClient(credentials, config) :> IAmazonDynamoDBStreams type FsCheckGenerators = static member MemoryStream = @@ -40,6 +45,7 @@ module Utils = type TableFixture() = let client = getDynamoDBAccount () + let streamsClient = getStreamsAccount () let tableName = getRandomTableName () member _.Client = client @@ -49,6 +55,11 @@ module Utils = let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) Scripting.TableContext.Initialize<'TRecord>(client, tableName, Throughput.Provisioned throughput) + member _.CreateEmptyWithStreaming<'TRecord>(streamType: StreamViewType) = + let throughput = ProvisionedThroughput(readCapacityUnits = 10L, writeCapacityUnits = 10L) + (Scripting.TableContext.Initialize<'TRecord>(client, tableName, Throughput.Provisioned throughput, Streaming.Enabled streamType), + StreamContext<'TRecord>(streamsClient, tableName)) + interface IAsyncLifetime with member _.InitializeAsync() = System.Threading.Tasks.Task.CompletedTask member _.DisposeAsync() = client.DeleteTableAsync(tableName) From bd33faa4a4b13d14bd86df6de52fa29508eb63fe Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 25 Jun 2024 10:16:35 +0800 Subject: [PATCH 3/7] wip More Streaming API work --- build.fsx | 2 +- paket.lock | 48 +-- src/FSharp.AWS.DynamoDB/StreamContext.fs | 292 ++++++++++++------ .../StreamingTests.fs | 11 +- 4 files changed, 236 insertions(+), 117 deletions(-) diff --git a/build.fsx b/build.fsx index 22643ee..ec8f81f 100755 --- a/build.fsx +++ b/build.fsx @@ -24,7 +24,7 @@ System.Environment.GetCommandLineArgs() #r "nuget: Fake.IO.FileSystem" #r "nuget: Fake.Api.GitHub" #r "nuget: Octokit" -#r "nuget: MSBuild.StructuredLogger, Version=2.1.820" +#r "nuget: MSBuild.StructuredLogger, Version=2.2.206" open Fake.Core open Fake.DotNet diff --git a/paket.lock b/paket.lock index 2d430ae..a6f13fc 100644 --- a/paket.lock +++ b/paket.lock @@ -2,10 +2,10 @@ STORAGE: NONE RESTRICTION: || (== net8.0) (== netstandard2.0) (== netstandard2.1) NUGET remote: https://api.nuget.org/v3/index.json - AWSSDK.Core (3.7.303.5) + AWSSDK.Core (3.7.304.18) Microsoft.Bcl.AsyncInterfaces (>= 1.1) - restriction: || (&& (== net8.0) (< netcoreapp3.1)) (== netstandard2.0) (== netstandard2.1) - AWSSDK.DynamoDBv2 (3.7.302.6) - AWSSDK.Core (>= 3.7.303.5 < 4.0) + AWSSDK.DynamoDBv2 (3.7.303.24) + AWSSDK.Core (>= 3.7.304.18 < 4.0) DotNet.ReproducibleBuilds (1.1.1) Microsoft.SourceLink.AzureRepos.Git (>= 1.1.1) Microsoft.SourceLink.Bitbucket.Git (>= 1.1.1) @@ -42,32 +42,32 @@ NUGET remote: https://api.nuget.org/v3/index.json FsCheck (2.16.6) FSharp.Core (>= 4.2.3) - FSharp.Core (8.0.200) - Microsoft.CodeCoverage (17.9) - Microsoft.NET.Test.Sdk (17.9) - Microsoft.CodeCoverage (>= 17.9) - Microsoft.TestPlatform.TestHost (>= 17.9) - Microsoft.TestPlatform.ObjectModel (17.9) + FSharp.Core (8.0.300) + Microsoft.CodeCoverage (17.10) + Microsoft.NET.Test.Sdk (17.10) + Microsoft.CodeCoverage (>= 17.10) + Microsoft.TestPlatform.TestHost (>= 17.10) + Microsoft.TestPlatform.ObjectModel (17.10) System.Reflection.Metadata (>= 1.6) - Microsoft.TestPlatform.TestHost (17.9) - Microsoft.TestPlatform.ObjectModel (>= 17.9) + Microsoft.TestPlatform.TestHost (17.10) + Microsoft.TestPlatform.ObjectModel (>= 17.10) Newtonsoft.Json (>= 13.0.1) Newtonsoft.Json (13.0.3) System.Collections.Immutable (8.0) System.Reflection.Metadata (8.0) System.Collections.Immutable (>= 8.0) - xunit (2.7) - xunit.analyzers (>= 1.11) - xunit.assert (>= 2.7) - xunit.core (2.7) + xunit (2.8.1) + xunit.analyzers (>= 1.14) + xunit.assert (>= 2.8.1) + xunit.core (2.8.1) xunit.abstractions (2.0.3) - xunit.analyzers (1.11) - xunit.assert (2.7) - xunit.core (2.7) - xunit.extensibility.core (2.7) - xunit.extensibility.execution (2.7) - xunit.extensibility.core (2.7) + xunit.analyzers (1.14) + xunit.assert (2.8.1) + xunit.core (2.8.1) + xunit.extensibility.core (2.8.1) + xunit.extensibility.execution (2.8.1) + xunit.extensibility.core (2.8.1) xunit.abstractions (>= 2.0.3) - xunit.extensibility.execution (2.7) - xunit.extensibility.core (2.7) - xunit.runner.visualstudio (2.5.7) + xunit.extensibility.execution (2.8.1) + xunit.extensibility.core (2.8.1) + xunit.runner.visualstudio (2.8.1) diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs index 5f9ca72..5f6ec1b 100644 --- a/src/FSharp.AWS.DynamoDB/StreamContext.fs +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -23,10 +23,10 @@ type StreamRecord<'TRecord> = type StreamPosition = { ShardId: string; SequenceNumber: string option } -type StreamReadFrom = +type ReadStreamFrom = | Oldest | Newest - | Position of StreamPosition + | Positions of StreamPosition list type private Enumerator<'TRecord>(initialIterator : string, fetchNext: string -> Task<('TRecord [] * string option)>) = let mutable iterator = initialIterator @@ -82,52 +82,21 @@ module private ShardTree = | Some (Node (_, children)) -> children | _ -> [] -type private ShardMsg = - | UpdatedShardPosition of StreamPosition - | EndOfShard of string - | ShardsUpdated of ShardTree list - -type private StreamState = { Trees: ShardTree list; Workers: Map * StreamPosition> } - - -/// DynamoDB client object for intrepreting Dynamo streams sharing the same F# record representations as the `TableContext` -[] -type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName: string, template: RecordTemplate<'TRecord>) = - - [] - let idleTimeBetweenReadsMilliseconds = 1000 - [] - let maxRecords = 1000 - [] - let listShardsCacheAgeMilliseconds = 10000 - - let listStreams (ct: CancellationToken) : Task = - task { - let req = ListStreamsRequest(TableName = tableName, Limit = 100) - let! response = client.ListStreamsAsync(req, ct) - return response.Streams |> Seq.map _.StreamArn |> Seq.toList - } - - let describeStream (streamArn: string) (ct: CancellationToken) = - task { - let! response = client.DescribeStreamAsync(DescribeStreamRequest(StreamArn = streamArn), ct) - return response.StreamDescription - } - +module private Shard = /// /// Returns the 'oldest' shard in a stream, according to the logic: /// First shard which has no parent ID /// First shard which has a parent ID not returned in the description /// First shard in the list /// - /// `StreamDescription` returned for the stream ARN - let oldestShard (stream: StreamDescription) = - stream.Shards + /// sequence of `Shard` objects from the `StreamDescription` + let oldest (shards : Shard seq) = + shards |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.ParentShardId)) |> Option.orElseWith (fun () -> - stream.Shards - |> Seq.tryFind (fun s1 -> not (stream.Shards |> Seq.exists (fun s2 -> s1.ParentShardId = s2.ShardId)))) - |> Option.orElseWith (fun () -> stream.Shards |> Seq.tryHead) + shards + |> Seq.tryFind (fun s1 -> not (shards |> Seq.exists (fun s2 -> s1.ParentShardId = s2.ShardId)))) + |> Option.orElseWith (fun () -> shards |> Seq.tryHead) /// /// Returns the 'newest' shard in a stream, according to the logic: @@ -135,40 +104,51 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName /// First shard which has an empty `EndingSequenceNumber` (is still being actively written to) /// First shard in the list /// - /// `StreamDescription` returned for the stream ARN + /// sequence of `Shard` objects from the `StreamDescription` // TODO: Should this ONLY return active streams? - let newestShard (stream: StreamDescription) = - stream.Shards - |> Seq.tryFind (fun s1 -> not (stream.Shards |> Seq.exists (fun s2 -> s1.ShardId = s2.ParentShardId))) + let newest (shards : Shard seq) = + shards + |> Seq.tryFind (fun s1 -> not (shards |> Seq.exists (fun s2 -> s1.ShardId = s2.ParentShardId))) |> Option.orElseWith (fun () -> - stream.Shards + shards |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.SequenceNumberRange.EndingSequenceNumber))) - |> Option.orElseWith (fun () -> stream.Shards |> Seq.tryHead) - - /// - /// Returns all of the currently open shards in the stream - /// - /// `StreamDescription` returned for the stream ARN - let openShards (stream: StreamDescription) = - stream.Shards |> Seq.filter (fun s -> String.IsNullOrEmpty(s.ParentShardId)) + |> Option.orElseWith (fun () -> shards |> Seq.tryHead) /// /// Returns all of the root shards (with no parent) in the stream /// /// `StreamDescription` returned for the stream ARN - let rootShards (stream : StreamDescription) = - stream.Shards |> Seq.filter (fun s -> + let roots (shards : Shard seq) = + shards |> Seq.filter (fun s -> String.IsNullOrEmpty(s.ParentShardId) - || not (stream.Shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) + || not (shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) + + /// + /// Returns all of the leaf shards (with no children) in the stream + /// + /// `StreamDescription` returned for the stream ARN + let leaves (shards : Shard seq) = + shards |> Seq.filter (fun s -> not (shards |> Seq.exists (fun c -> c.ParentShardId = s.ShardId))) + + let ancestors (shardId : string) (shards : Shard seq) = + let rec ancestorsInner (shardId : string) = + match shards |> Seq.tryFind (fun s -> s.ShardId = shardId) with + | None -> [] + | Some s -> + match s.ParentShardId with + | null | "" -> [] + | parent -> parent :: ancestorsInner parent + + ancestorsInner shardId /// /// Build trees of the shard hierarchy in the stream. /// Nodes will usually have a single child, but can have multiple children if the shard has been split. /// - /// `StreamDescription` returned for the stream ARN - let shardTrees (stream: StreamDescription): ShardTree seq = + /// sequence of `Shard` objects from the `StreamDescription` + let toTrees (shards : Shard seq): ShardTree list = let children = - stream.Shards + shards |> Seq.filter (fun s -> not (String.IsNullOrEmpty(s.ParentShardId))) |> Seq.groupBy _.ParentShardId |> Map.ofSeq @@ -178,9 +158,133 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName | None -> Leaf shard | Some cs -> Node(shard, cs |> Seq.map buildTree |> Seq.toList) - rootShards stream |> Seq.map buildTree + roots shards |> Seq.map buildTree |> Seq.toList +type private ShardMsg = + | UpdatedShardPosition of StreamPosition + | EndOfShard of string + | ShardsUpdated of Shard seq + +type private ShardProcessingState = + | NotStarted + | Processing of StreamPosition + | Completed + +type private StreamState = { Shards: Shard list; StreamProgress: Map; ShardWorkers: Map> } + +module private Map = + let union (replaceWith: Map<'K, 'V>) (original: Map<'K, 'V>) = + Map.fold (fun acc key value -> Map.add key value acc) original replaceWith + +module private StreamState = + let empty = { Shards = []; StreamProgress = Map.empty; ShardWorkers = Map.empty } + + /// + /// Initializes the stream state with the initial list of shards. Supports the following start positions: + /// - Oldest: Start processing from the start of the oldest shards available (note 24h retention) + /// - Newest: Start processing from the start of the currently open shards + /// - Positions: Start processing from the given positions (not currently supported) + /// + /// List of shards in the stream + /// `ReadStreamFrom` start position + let init (shards: Shard seq) (start : ReadStreamFrom) = + let shards = shards |> Seq.toList + let progress = + match start with + | Oldest -> shards |> Seq.map (fun s -> (s.ShardId, NotStarted)) |> Map.ofSeq + | Newest -> + let leaves = Shard.leaves shards |> Seq.map (fun s -> (s.ShardId, NotStarted)) // TODO: Check to see if stream is closed + let oldShards = shards |> Seq.filter (fun s -> not (leaves |> Seq.exists (fun (id, _) -> id = s.ShardId))) |> Seq.map (fun s -> (s.ShardId, Completed)) + leaves |> Seq.append oldShards |> Map.ofSeq + | Positions pos -> + failwithf "Not supported yet" + + { Shards = shards; StreamProgress = progress; ShardWorkers = Map.empty } + + /// + /// Updates (appends to) the list of shards in the stream state + /// + /// New list of shards + /// Current state of the stream + /// Updated state + let updateShards (shards: Shard seq) (state: StreamState) = + let newProgress = shards |> Seq.filter (fun s -> not (state.StreamProgress |> Map.containsKey s.ShardId)) + |> Seq.map (fun s -> (s.ShardId, NotStarted)) + + { state with Shards = shards |> Seq.toList; StreamProgress = state.StreamProgress |> Map.toSeq |> Seq.append newProgress |> Map.ofSeq } + + /// + /// Registers a new shard worker and marks the shard as processing + /// + /// ShardId of the newly started shard + /// Shard worker + /// Current state of the stream + /// Updated state + let startShard (shardId: string) (worker: MailboxProcessor) (state: StreamState) = + { state with StreamProgress = state.StreamProgress |> Map.add shardId (Processing { ShardId = shardId; SequenceNumber = None }); ShardWorkers = state.ShardWorkers |> Map.add shardId worker } + + /// + /// Registers the new shard workers and marks the shards as processing + /// + /// ShardId of the newly started shard + /// Shard worker + /// Current state of the stream + /// Updated state + let startShards (workers: Map>) (state: StreamState) = + let updatedProgress = workers |> Map.map (fun sId _ -> Processing { ShardId = sId; SequenceNumber = None }) + { state with StreamProgress = state.StreamProgress |> Map.union updatedProgress ; ShardWorkers = state.ShardWorkers |> Map.union workers } + + /// + /// Updates the position of a shard in the stream + /// + /// New position of the shard + /// Current state of the stream + /// Updated state + let updatedShardPosition (position: StreamPosition) (state: StreamState) = + { state with StreamProgress = state.StreamProgress |> Map.add position.ShardId (Processing position) } + + let completeShard (shardId: string) (state: StreamState) = + { state with StreamProgress = state.StreamProgress |> Map.add shardId Completed; ShardWorkers = state.ShardWorkers |> Map.remove(shardId) } + + /// + /// Returns all shards ready to be processed - with no parent shard or parent shard completed + /// + /// Current state of the stream + /// List of shards ready to start processing + let nextShards (state : StreamState) = + state.Shards + |> List.filter (fun s -> state.StreamProgress |> Map.tryFind s.ShardId |> Option.exists ((=) NotStarted)) + |> List.filter (fun s -> + // Filter shards with a parent that hasn’t been completed + match state.StreamProgress |> Map.tryFind s.ParentShardId with + | Some Completed | None -> true + | Some NotStarted | Some (Processing _) -> false) + +/// DynamoDB client object for intrepreting Dynamo streams sharing the same F# record representations as the `TableContext` +[] +type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName: string, template: RecordTemplate<'TRecord>) = + + [] + let idleTimeBetweenReadsMilliseconds = 1000 + [] + let maxRecords = 1000 + [] + let listShardsCacheAgeMilliseconds = 10000 + + let listStreams (ct: CancellationToken) : Task = + task { + let req = ListStreamsRequest(TableName = tableName, Limit = 100) + let! response = client.ListStreamsAsync(req, ct) + return response.Streams |> Seq.map _.StreamArn |> Seq.toList + } + + let describeStream (streamArn: string) (ct: CancellationToken) = + task { + let! response = client.DescribeStreamAsync(DescribeStreamRequest(StreamArn = streamArn), ct) + return response.StreamDescription + } + // TODO: what does this return if no more records in shard? let getShardIterator (streamArn: string) (iteratorType: ShardIteratorType) (position: StreamPosition) (ct: CancellationToken) = @@ -231,12 +335,27 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName do! Async.Sleep idleTimeBetweenReadsMilliseconds } ), ct) + + let startShardSyncWorker (outbox: MailboxProcessor) (streamArn : string) (ct : CancellationToken) = + MailboxProcessor.Start((fun _ -> + let rec loop () = + async { + if not ct.IsCancellationRequested then + // TODO: Handle AWS read failures + let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect + outbox.Post(ShardsUpdated stream.Shards) + do! Async.Sleep listShardsCacheAgeMilliseconds + do! loop () + } + + loop () + ), ct) - let startStreamProcessor (streamArn : string) (processRecord: Record -> unit) (ct: CancellationToken) = + let startStreamProcessor (streamArn : string) (startPosition : ReadStreamFrom) (processRecord: Record -> unit) (ct: CancellationToken) = MailboxProcessor.Start(fun inbox -> - let startWorker (shard : Shard) = - let position = { ShardId = shard.ShardId; SequenceNumber = None } - (shard.ShardId, (startShardProcessor inbox streamArn position processRecord ct, position)) + let startShardWorker (shard : Shard) = + let position = { ShardId = shard.ShardId; SequenceNumber = None } // TODO: Supply sequence number + (shard.ShardId, startShardProcessor inbox streamArn position processRecord ct) let rec loop (state: StreamState) = async { @@ -245,20 +364,25 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName match msg with | UpdatedShardPosition position -> printfn "Recording current shard position: %0A" position - do! loop { state with Workers = state.Workers |> Map.change position.ShardId (Option.map (fun (p, _) -> (p, position))) } + do! loop (state |> StreamState.updatedShardPosition position) | EndOfShard shardId -> - let newWorkers = ShardTree.nextShards shardId state.Trees |> List.map (ShardTree.shard >> startWorker) - do! loop { state with Workers = state.Workers |> Map.remove shardId |> Map.toList |> List.append newWorkers |> Map.ofList} - | ShardsUpdated trees -> - do! loop { state with Trees = trees } // TODO: Start new workers if necessary? + let state = state |> StreamState.completeShard shardId + let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq + newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) + do! loop (state |> StreamState.startShards newWorkers) + | ShardsUpdated shards -> + let state = state |> StreamState.updateShards shards + let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq + newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) + do! loop (state |> StreamState.startShards newWorkers) } async { let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect - let trees = shardTrees stream |> Seq.toList - // Start workers at the root shards. - // TODO: Handle LATEST semantics or starting from a checkpoint - let workers = trees |> Seq.map (ShardTree.shard >> startWorker) |> Map.ofSeq - do! loop { Trees = trees; Workers = workers } + let state = StreamState.init stream.Shards startPosition + startShardSyncWorker inbox streamArn ct |> ignore // TODO: Store shard sync worker + let workers = StreamState.nextShards state |> Seq.map startShardWorker |> Map.ofSeq + workers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) + do! loop (state |> StreamState.startShards workers) } ) @@ -322,7 +446,7 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName task { let! streams = listStreams CancellationToken.None let! stream = describeStream (streams |> List.head) CancellationToken.None - match oldestShard stream with + match Shard.oldest stream.Shards with | Some shard -> let! iterator = getShardIterator stream.StreamArn ShardIteratorType.TRIM_HORIZON { ShardId = shard.ShardId; SequenceNumber = None } CancellationToken.None let! records = getRecords iterator maxRecords CancellationToken.None @@ -332,16 +456,8 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName } - // member t.StartReadingAsync(streamArn: string, processRecord: StreamRecord -> unit, ?ct: CancellationToken) = - // task { - // let! stream = describeStream streamArn (ct |> Option.defaultValue CancellationToken.None) - // let shard = oldestShard stream |> Option.get - // let! iterator = getShardIterator streamArn ShardIteratorType.TRIM_HORIZON { ShardId = shard.ShardId; SequenceNumber = None } ct - // let rec readLoop (iterator: string) = - // let! (recs, nextIterator) = getRecords iterator 1000 ct - // recs |> Array.iter (fun r -> processRecord (t.ParseStreamRecord r)) - // match nextIterator with - // | None -> () - // | Some i -> readLoop i - // readLoop iterator - // } \ No newline at end of file + member t.StartReadingAsync(streamArn: string, processRecord: StreamRecord<'TRecord> -> unit, ?startPosition : ReadStreamFrom, ?ct: CancellationToken) = + task { + let processor = startStreamProcessor streamArn (defaultArg startPosition Newest) (fun r -> processRecord (t.ParseStreamRecord r)) (ct |> Option.defaultValue CancellationToken.None) + return () + } diff --git a/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs index 55fab7d..79d9765 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs @@ -8,6 +8,7 @@ open Xunit open FSharp.AWS.DynamoDB open FSharp.AWS.DynamoDB.Scripting open Amazon.DynamoDBv2 +open System.Collections.Concurrent [] module StreamingTests = @@ -33,15 +34,17 @@ type ``Streaming Tests``(fixture: TableFixture) = [] let ``Parse New Item from stream`` () = + let recordQueue = ConcurrentQueue>() + let streamArn = streams.ListStreamsAsync() |> Async.AwaitTask |> Async.RunSynchronously |> Seq.head let value = mkItem () let key = table.PutItem value let value' = table.GetItem key - let task = streams.TempReadAllRecords() - task.Wait() - let records = task.Result + let task = streams.StartReadingAsync(streamArn, fun record -> recordQueue.Enqueue record) |> Async.AwaitTask |> Async.RunSynchronously + Async.Sleep 1000 |> Async.RunSynchronously + let records = recordQueue.ToArray() test <@ 1 = Array.length records @> test <@ records[0].Operation = Insert @> test <@ records[0].Old = None @> test <@ records[0].New = Some value' @> - interface IClassFixture \ No newline at end of file + interface IClassFixture From 527a64be97cf802824bca733f50309dcd22e178e Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 4 Sep 2024 10:32:44 +0800 Subject: [PATCH 4/7] Modify streams API, flesh out more logic in shard processing --- src/FSharp.AWS.DynamoDB/StreamContext.fs | 530 ++++++++++-------- .../StreamingTests.fs | 35 +- 2 files changed, 302 insertions(+), 263 deletions(-) diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs index 5f6ec1b..943a6f6 100644 --- a/src/FSharp.AWS.DynamoDB/StreamContext.fs +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -1,7 +1,6 @@ namespace FSharp.AWS.DynamoDB open System -open System.Collections.Concurrent open System.Threading open System.Threading.Tasks @@ -28,9 +27,9 @@ type ReadStreamFrom = | Newest | Positions of StreamPosition list -type private Enumerator<'TRecord>(initialIterator : string, fetchNext: string -> Task<('TRecord [] * string option)>) = +type private Enumerator<'TRecord>(initialIterator: string, fetchNext: string -> Task<('TRecord[] * string option)>) = let mutable iterator = initialIterator - let mutable records: 'TRecord [] = Array.empty + let mutable records: 'TRecord[] = Array.empty let mutable index = 0 interface IAsyncEnumerator<'TRecord> with @@ -41,20 +40,22 @@ type private Enumerator<'TRecord>(initialIterator : string, fetchNext: string -> index <- index + 1 ValueTask(true) else - let nextTask = + let nextTask = (fetchNext iterator) - .ContinueWith((fun (task: Task<('TRecord [] * string option)>) -> - match task.Result with - | (_, None) -> - records <- Array.empty - index <- 0 - false - | (recs, Some nextIterator) -> - records <- recs - iterator <- nextIterator - index <- 0 - true - ), TaskContinuationOptions.OnlyOnRanToCompletion) + .ContinueWith( + (fun (task: Task<('TRecord[] * string option)>) -> + match task.Result with + | (_, None) -> + records <- Array.empty + index <- 0 + false + | (recs, Some nextIterator) -> + records <- recs + iterator <- nextIterator + index <- 0 + true), + TaskContinuationOptions.OnlyOnRanToCompletion + ) ValueTask(nextTask) member _.DisposeAsync() = ValueTask() @@ -64,118 +65,92 @@ type private ShardTree = | Node of Shard * ShardTree list module private ShardTree = - let shard (tree : ShardTree) = + let shard (tree: ShardTree) = match tree with - | Leaf shard | Node (shard, _) -> shard + | Leaf shard + | Node(shard, _) -> shard - let find (shardId : string) (tree : ShardTree) = + let find (shardId: string) (tree: ShardTree) = let rec findInner t = match t with - | Leaf s | Node (s, _) when s.ShardId = shardId -> Some(t) - | Node (_, children) -> children |> Seq.choose (findInner) |> Seq.tryHead + | Leaf s + | Node(s, _) when s.ShardId = shardId -> Some(t) + | Node(_, children) -> children |> Seq.choose (findInner) |> Seq.tryHead | Leaf _ -> None - + findInner tree - let nextShards (shardId : string) (trees : ShardTree list) = + let nextShards (shardId: string) (trees: ShardTree list) = match trees |> List.choose (find shardId) |> List.tryHead with - | Some (Node (_, children)) -> children + | Some(Node(_, children)) -> children | _ -> [] module private Shard = + /// - /// Returns the 'oldest' shard in a stream, according to the logic: - /// First shard which has no parent ID - /// First shard which has a parent ID not returned in the description - /// First shard in the list + /// Returns all of the root shards (with no parent) in the stream /// /// sequence of `Shard` objects from the `StreamDescription` - let oldest (shards : Shard seq) = + let roots (shards: Shard seq) = shards - |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.ParentShardId)) - |> Option.orElseWith (fun () -> - shards - |> Seq.tryFind (fun s1 -> not (shards |> Seq.exists (fun s2 -> s1.ParentShardId = s2.ShardId)))) - |> Option.orElseWith (fun () -> shards |> Seq.tryHead) + |> Seq.filter (fun s -> + String.IsNullOrEmpty(s.ParentShardId) + || not (shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) /// - /// Returns the 'newest' shard in a stream, according to the logic: - /// First shard which has no child IDs - /// First shard which has an empty `EndingSequenceNumber` (is still being actively written to) - /// First shard in the list + /// Returns all of the leaf shards (with no children) in the stream /// /// sequence of `Shard` objects from the `StreamDescription` - // TODO: Should this ONLY return active streams? - let newest (shards : Shard seq) = - shards - |> Seq.tryFind (fun s1 -> not (shards |> Seq.exists (fun s2 -> s1.ShardId = s2.ParentShardId))) - |> Option.orElseWith (fun () -> - shards - |> Seq.tryFind (fun s -> String.IsNullOrEmpty(s.SequenceNumberRange.EndingSequenceNumber))) - |> Option.orElseWith (fun () -> shards |> Seq.tryHead) - + let leaves (shards: Shard seq) = shards |> Seq.filter (fun s -> not (shards |> Seq.exists (fun c -> c.ParentShardId = s.ShardId))) + /// - /// Returns all of the root shards (with no parent) in the stream + /// Returns all of the open shards (with no ending sequence number) in the stream /// - /// `StreamDescription` returned for the stream ARN - let roots (shards : Shard seq) = - shards |> Seq.filter (fun s -> - String.IsNullOrEmpty(s.ParentShardId) - || not (shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) + /// sequence of `Shard` objects from the `StreamDescription` + let openShards (shards: Shard seq) = shards |> Seq.filter (fun s -> String.IsNullOrEmpty(s.SequenceNumberRange.EndingSequenceNumber)) /// - /// Returns all of the leaf shards (with no children) in the stream + /// Returns the ancestor shard ids of a shard in the stream /// - /// `StreamDescription` returned for the stream ARN - let leaves (shards : Shard seq) = - shards |> Seq.filter (fun s -> not (shards |> Seq.exists (fun c -> c.ParentShardId = s.ShardId))) - - let ancestors (shardId : string) (shards : Shard seq) = - let rec ancestorsInner (shardId : string) = + /// shard id of the shard to find ancestors for + /// sequence of `Shard` objects from the `StreamDescription` + let ancestors (shards: Shard seq) (shardId: string) = + let rec ancestorsInner (shardId: string) = match shards |> Seq.tryFind (fun s -> s.ShardId = shardId) with | None -> [] - | Some s -> + | Some s -> match s.ParentShardId with - | null | "" -> [] + | null + | "" -> [] | parent -> parent :: ancestorsInner parent - ancestorsInner shardId + ancestorsInner shardId |> Seq.distinct /// - /// Build trees of the shard hierarchy in the stream. - /// Nodes will usually have a single child, but can have multiple children if the shard has been split. + /// Returns true if the shard id is in the list of shards /// /// sequence of `Shard` objects from the `StreamDescription` - let toTrees (shards : Shard seq): ShardTree list = - let children = - shards - |> Seq.filter (fun s -> not (String.IsNullOrEmpty(s.ParentShardId))) - |> Seq.groupBy _.ParentShardId - |> Map.ofSeq - - let rec buildTree (shard: Shard) = - match children |> Map.tryFind shard.ShardId with - | None -> Leaf shard - | Some cs -> Node(shard, cs |> Seq.map buildTree |> Seq.toList) - - roots shards |> Seq.map buildTree |> Seq.toList - - -type private ShardMsg = - | UpdatedShardPosition of StreamPosition - | EndOfShard of string + /// shard id to check for + let containsShardId (shards: Shard seq) (shardId: string) = shards |> Seq.exists (fun s -> s.ShardId = shardId) + +type private ShardMsg = + | UpdatedShardPosition of StreamPosition + | EndOfShard of string | ShardsUpdated of Shard seq -type private ShardProcessingState = +type private ShardProcessingState = | NotStarted | Processing of StreamPosition | Completed -type private StreamState = { Shards: Shard list; StreamProgress: Map; ShardWorkers: Map> } +type private StreamState = + { Shards: Shard list + StreamProgress: Map + ShardWorkers: Map> } module private Map = let union (replaceWith: Map<'K, 'V>) (original: Map<'K, 'V>) = - Map.fold (fun acc key value -> Map.add key value acc) original replaceWith + Map.fold (fun acc key value -> Map.add key value acc) original replaceWith module private StreamState = let empty = { Shards = []; StreamProgress = Map.empty; ShardWorkers = Map.empty } @@ -183,22 +158,51 @@ module private StreamState = /// /// Initializes the stream state with the initial list of shards. Supports the following start positions: /// - Oldest: Start processing from the start of the oldest shards available (note 24h retention) - /// - Newest: Start processing from the start of the currently open shards - /// - Positions: Start processing from the given positions (not currently supported) + /// - Newest: Start processing from the end of the currently open shards + /// - Positions: Start processing from the given positions /// /// List of shards in the stream /// `ReadStreamFrom` start position - let init (shards: Shard seq) (start : ReadStreamFrom) = + let init (shards: Shard seq) (start: ReadStreamFrom) = let shards = shards |> Seq.toList - let progress = + let progress = match start with | Oldest -> shards |> Seq.map (fun s -> (s.ShardId, NotStarted)) |> Map.ofSeq - | Newest -> - let leaves = Shard.leaves shards |> Seq.map (fun s -> (s.ShardId, NotStarted)) // TODO: Check to see if stream is closed - let oldShards = shards |> Seq.filter (fun s -> not (leaves |> Seq.exists (fun (id, _) -> id = s.ShardId))) |> Seq.map (fun s -> (s.ShardId, Completed)) - leaves |> Seq.append oldShards |> Map.ofSeq + + | Newest -> + let openShards = + Shard.openShards shards + |> Seq.map (fun s -> + (s.ShardId, + Processing + { ShardId = s.ShardId + SequenceNumber = Some s.SequenceNumberRange.EndingSequenceNumber })) + |> Map.ofSeq + let closedShards = + shards + |> Seq.filter (fun s -> not (openShards |> Map.containsKey s.ShardId)) + |> Seq.map (fun s -> (s.ShardId, Completed)) + |> Map.ofSeq + closedShards |> Map.union openShards + | Positions pos -> - failwithf "Not supported yet" + let inProgress = + pos + |> Seq.filter (fun p -> Shard.containsShardId shards p.ShardId) + |> Seq.map (fun p -> (p.ShardId, Processing p)) + |> Map.ofSeq + let completed = + inProgress + |> Map.keys + |> Seq.collect (Shard.ancestors shards) + |> Seq.map (fun s -> (s, Completed)) + |> Map.ofSeq + let notStarted = + shards + |> Seq.filter (fun s -> not (inProgress |> Map.containsKey s.ShardId) && not (completed |> Map.containsKey s.ShardId)) + |> Seq.map (fun s -> (s.ShardId, NotStarted)) + |> Map.ofSeq + inProgress |> Map.union completed |> Map.union notStarted { Shards = shards; StreamProgress = progress; ShardWorkers = Map.empty } @@ -209,31 +213,35 @@ module private StreamState = /// Current state of the stream /// Updated state let updateShards (shards: Shard seq) (state: StreamState) = - let newProgress = shards |> Seq.filter (fun s -> not (state.StreamProgress |> Map.containsKey s.ShardId)) - |> Seq.map (fun s -> (s.ShardId, NotStarted)) + // TODO: Prune old ShardIds from StreamProgress. What should happen if the shard has disappeard but not been processed? + let newProgress = + shards + |> Seq.filter (fun s -> not (state.StreamProgress |> Map.containsKey s.ShardId)) + |> Seq.map (fun s -> (s.ShardId, NotStarted)) - { state with Shards = shards |> Seq.toList; StreamProgress = state.StreamProgress |> Map.toSeq |> Seq.append newProgress |> Map.ofSeq } + { state with + Shards = shards |> Seq.toList + StreamProgress = state.StreamProgress |> Map.toSeq |> Seq.append newProgress |> Map.ofSeq } /// - /// Registers a new shard worker and marks the shard as processing - /// - /// ShardId of the newly started shard - /// Shard worker - /// Current state of the stream - /// Updated state - let startShard (shardId: string) (worker: MailboxProcessor) (state: StreamState) = - { state with StreamProgress = state.StreamProgress |> Map.add shardId (Processing { ShardId = shardId; SequenceNumber = None }); ShardWorkers = state.ShardWorkers |> Map.add shardId worker } - - /// - /// Registers the new shard workers and marks the shards as processing + /// Registers the new shard workers and marks the shards as processing if necessary /// /// ShardId of the newly started shard /// Shard worker /// Current state of the stream /// Updated state let startShards (workers: Map>) (state: StreamState) = - let updatedProgress = workers |> Map.map (fun sId _ -> Processing { ShardId = sId; SequenceNumber = None }) - { state with StreamProgress = state.StreamProgress |> Map.union updatedProgress ; ShardWorkers = state.ShardWorkers |> Map.union workers } + let updatedProgress = + workers + |> Map.map (fun s _ -> + match state.StreamProgress |> Map.tryFind s with + | Some NotStarted + | None -> Processing { ShardId = s; SequenceNumber = None } + | Some p -> p) // TODO: Log some asserts here (eg if completed) + + { state with + StreamProgress = state.StreamProgress |> Map.union updatedProgress + ShardWorkers = state.ShardWorkers |> Map.union workers } /// /// Updates the position of a shard in the stream @@ -242,140 +250,165 @@ module private StreamState = /// Current state of the stream /// Updated state let updatedShardPosition (position: StreamPosition) (state: StreamState) = - { state with StreamProgress = state.StreamProgress |> Map.add position.ShardId (Processing position) } + { state with + StreamProgress = state.StreamProgress |> Map.add position.ShardId (Processing position) } + /// + /// Marks a shard as completed and removes the worker + /// + /// ShardId of the completed shard + /// Current state of the stream + /// Updated state let completeShard (shardId: string) (state: StreamState) = - { state with StreamProgress = state.StreamProgress |> Map.add shardId Completed; ShardWorkers = state.ShardWorkers |> Map.remove(shardId) } + { state with + StreamProgress = state.StreamProgress |> Map.add shardId Completed + ShardWorkers = state.ShardWorkers |> Map.remove (shardId) } /// - /// Returns all shards ready to be processed - with no parent shard or parent shard completed + /// Returns all shards ready to be processed: + /// - `NotStarted` with no parent shard or parent shard completed + /// - `Processing` with no worker /// /// Current state of the stream - /// List of shards ready to start processing - let nextShards (state : StreamState) = - state.Shards - |> List.filter (fun s -> state.StreamProgress |> Map.tryFind s.ShardId |> Option.exists ((=) NotStarted)) - |> List.filter (fun s -> - // Filter shards with a parent that hasn’t been completed - match state.StreamProgress |> Map.tryFind s.ParentShardId with - | Some Completed | None -> true - | Some NotStarted | Some (Processing _) -> false) - -/// DynamoDB client object for intrepreting Dynamo streams sharing the same F# record representations as the `TableContext` + /// List of shards ready to start processing, with the sequence number to start from + let nextShards (state: StreamState) = + state.Shards + |> List.choose (fun s -> + match state.StreamProgress |> Map.tryFind s.ShardId with + | Some NotStarted -> + match state.StreamProgress |> Map.tryFind s.ParentShardId with + | Some Completed + | None -> Some(s, None) + | Some NotStarted + | Some(Processing _) -> None + | Some(Processing pos) -> + match state.ShardWorkers |> Map.tryFind s.ShardId with + | Some _ -> None + | None -> Some(s, pos.SequenceNumber) + | _ -> None) + + +/// DynamoDB client object for interpreting Dynamo streams sharing the same F# record representations as the `TableContext` [] type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName: string, template: RecordTemplate<'TRecord>) = - [] + [] let idleTimeBetweenReadsMilliseconds = 1000 - [] + [] let maxRecords = 1000 [] let listShardsCacheAgeMilliseconds = 10000 - let listStreams (ct: CancellationToken) : Task = - task { - let req = ListStreamsRequest(TableName = tableName, Limit = 100) - let! response = client.ListStreamsAsync(req, ct) - return response.Streams |> Seq.map _.StreamArn |> Seq.toList - } - - let describeStream (streamArn: string) (ct: CancellationToken) = - task { - let! response = client.DescribeStreamAsync(DescribeStreamRequest(StreamArn = streamArn), ct) - return response.StreamDescription - } + let listStreams (ct: CancellationToken) : Task = task { + let req = ListStreamsRequest(TableName = tableName, Limit = 100) + let! response = client.ListStreamsAsync(req, ct) + return response.Streams |> Seq.map _.StreamArn |> Seq.toList + } + let describeStream (streamArn: string) (ct: CancellationToken) = task { + let! response = client.DescribeStreamAsync(DescribeStreamRequest(StreamArn = streamArn), ct) + return response.StreamDescription + } // TODO: what does this return if no more records in shard? - let getShardIterator (streamArn: string) (iteratorType: ShardIteratorType) (position: StreamPosition) (ct: CancellationToken) = - task { - let req = GetShardIteratorRequest( - StreamArn = streamArn, - ShardId = position.ShardId, - ShardIteratorType = iteratorType) - position.SequenceNumber |> Option.iter (fun n -> req.SequenceNumber <- n) - let! response = client.GetShardIteratorAsync(req, ct) - return response.ShardIterator - } - - let getRecords (iterator: string) (limit: int) (ct: CancellationToken) = - task { - let! response = client.GetRecordsAsync(GetRecordsRequest(Limit = limit, ShardIterator = iterator), ct) - return response.Records |> Array.ofSeq, if isNull response.NextShardIterator then None else Some response.NextShardIterator - } + let getShardIterator (streamArn: string) (iteratorType: ShardIteratorType) (position: StreamPosition) (ct: CancellationToken) = task { + let req = GetShardIteratorRequest(StreamArn = streamArn, ShardId = position.ShardId, ShardIteratorType = iteratorType) + position.SequenceNumber |> Option.iter (fun n -> req.SequenceNumber <- n) + let! response = client.GetShardIteratorAsync(req, ct) + return response.ShardIterator + } + + let getRecords (iterator: string) (limit: int) (ct: CancellationToken) = task { + let! response = client.GetRecordsAsync(GetRecordsRequest(Limit = limit, ShardIterator = iterator), ct) + return + response.Records |> Array.ofSeq, + if isNull response.NextShardIterator then + None + else + Some response.NextShardIterator + } let getRecordsAsync (iterator: string) (ct: CancellationToken) = let batchSize = 5 { new IAsyncEnumerable with member _.GetAsyncEnumerator(ct) = Enumerator(iterator, (fun i -> getRecords i batchSize ct)) :> IAsyncEnumerator } - let startShardProcessor (outbox: MailboxProcessor) (streamArn : string) (position : StreamPosition) (processRecord: Record -> unit) (ct: CancellationToken) = - MailboxProcessor.Start((fun _ -> - let rec loop iterator = - async { - if not ct.IsCancellationRequested then - // TODO: Handle AWS read failures - let! (recs, nextIterator) = getRecords iterator maxRecords ct |> Async.AwaitTaskCorrect - // TODO: Process Record error handling - recs |> Array.iter processRecord - match nextIterator with - | None -> - outbox.Post(EndOfShard position.ShardId) - // End processing the shard - | Some i -> - outbox.Post(UpdatedShardPosition { ShardId = position.ShardId; SequenceNumber = Some i }) - do! Async.Sleep idleTimeBetweenReadsMilliseconds - do! loop i - } - - async { - let! initialIterator = getShardIterator streamArn ShardIteratorType.TRIM_HORIZON position ct |> Async.AwaitTaskCorrect - do! loop initialIterator - while not ct.IsCancellationRequested do - do! Async.Sleep idleTimeBetweenReadsMilliseconds - } - ), ct) - - let startShardSyncWorker (outbox: MailboxProcessor) (streamArn : string) (ct : CancellationToken) = - MailboxProcessor.Start((fun _ -> - let rec loop () = - async { - if not ct.IsCancellationRequested then - // TODO: Handle AWS read failures - let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect - outbox.Post(ShardsUpdated stream.Shards) - do! Async.Sleep listShardsCacheAgeMilliseconds - do! loop () - } - - loop () - ), ct) - - let startStreamProcessor (streamArn : string) (startPosition : ReadStreamFrom) (processRecord: Record -> unit) (ct: CancellationToken) = + let startShardProcessor + (outbox: MailboxProcessor) + (streamArn: string) + (position: StreamPosition) + (processRecord: Record -> Task) + (ct: CancellationToken) + = + MailboxProcessor + .Start( + (fun _ -> + let rec loop iterator = async { + if not ct.IsCancellationRequested then + // TODO: Handle AWS read failures + let! (recs, nextIterator) = getRecords iterator maxRecords ct |> Async.AwaitTaskCorrect + // TODO: Process Record error handling + for r in recs do + do! processRecord r |> Async.AwaitTaskCorrect + + match nextIterator with + | None -> outbox.Post(EndOfShard position.ShardId) + // End the loop - done processing this shard + | Some i -> + outbox.Post(UpdatedShardPosition { ShardId = position.ShardId; SequenceNumber = Some i }) + do! Async.Sleep idleTimeBetweenReadsMilliseconds + do! loop i + } + + async { + let! initialIterator = + getShardIterator streamArn ShardIteratorType.TRIM_HORIZON position ct |> Async.AwaitTaskCorrect + do! loop initialIterator + }), + ct + ) + + let startShardSyncWorker (outbox: MailboxProcessor) (streamArn: string) (ct: CancellationToken) = + MailboxProcessor + .Start( + (fun _ -> + let rec loop () = async { + if not ct.IsCancellationRequested then + // TODO: Handle AWS read failures + let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect + outbox.Post(ShardsUpdated stream.Shards) + do! Async.Sleep listShardsCacheAgeMilliseconds + do! loop () + } + + loop ()), + ct + ) + + let startStreamProcessor (streamArn: string) (startPosition: ReadStreamFrom) (processRecord: Record -> Task) (ct: CancellationToken) = MailboxProcessor.Start(fun inbox -> - let startShardWorker (shard : Shard) = - let position = { ShardId = shard.ShardId; SequenceNumber = None } // TODO: Supply sequence number + let startShardWorker (shard: Shard, sequenceNumber: string option) = + let position = { ShardId = shard.ShardId; SequenceNumber = sequenceNumber } (shard.ShardId, startShardProcessor inbox streamArn position processRecord ct) - let rec loop (state: StreamState) = - async { - if not ct.IsCancellationRequested then - let! msg = inbox.Receive() - match msg with - | UpdatedShardPosition position -> - printfn "Recording current shard position: %0A" position - do! loop (state |> StreamState.updatedShardPosition position) - | EndOfShard shardId -> - let state = state |> StreamState.completeShard shardId - let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq - newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) - do! loop (state |> StreamState.startShards newWorkers) - | ShardsUpdated shards -> - let state = state |> StreamState.updateShards shards - let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq - newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) - do! loop (state |> StreamState.startShards newWorkers) - } + let rec loop (state: StreamState) = async { + if not ct.IsCancellationRequested then + let! msg = inbox.Receive() + match msg with + | UpdatedShardPosition position -> + printfn "Recording current shard position: %0A" position + do! loop (state |> StreamState.updatedShardPosition position) + | EndOfShard shardId -> + let state = state |> StreamState.completeShard shardId + let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq + newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) + do! loop (state |> StreamState.startShards newWorkers) + | ShardsUpdated shards -> + let state = state |> StreamState.updateShards shards + let newWorkers = state |> StreamState.nextShards |> Seq.map startShardWorker |> Map.ofSeq + newWorkers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) + do! loop (state |> StreamState.startShards newWorkers) + } async { let! stream = describeStream streamArn ct |> Async.AwaitTaskCorrect let state = StreamState.init stream.Shards startPosition @@ -383,9 +416,8 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName let workers = StreamState.nextShards state |> Seq.map startShardWorker |> Map.ofSeq workers |> Map.iter (fun sId _ -> printfn "Starting worker for shard %s" sId) do! loop (state |> StreamState.startShards workers) - } - ) - + }) + /// DynamoDB streaming client instance used for the table operations member _.Client = client /// DynamoDB table name targeted by the context @@ -395,8 +427,7 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName /// - /// Creates a DynamoDB client instance for given F# record and table name.
- /// For creating, provisioning or verification, see VerifyOrCreateTableAsync and VerifyTableAsync. + /// Creates a DynamoDB stream processing instance for given F# record and table name.
///
/// DynamoDB streaming client instance. /// Table name to target. @@ -409,16 +440,15 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName /// Lists all the streams associated with the table. ///
/// Cancellation token for the operation. - member _.ListStreamsAsync(?ct : CancellationToken) = - listStreams(ct |> Option.defaultValue CancellationToken.None) + member _.ListStreamsAsync(?ct: CancellationToken) = listStreams (ct |> Option.defaultValue CancellationToken.None) /// - /// Parses a DynamoDB stream record into the corresponding F# type (matching the table definition). - /// Intended to be used directly from Lambda event handlers or Kinesis consumers. + /// Deserializes a DynamoDB stream record into the corresponding F# type (matching the table definition). + /// Intended to be used directly from Lambda event handlers or KCL consumers. /// - /// DynamoDB stream record to parse. + /// DynamoDB stream record to deserialize. /// `StreamRecord` with old value, new value, and/or `TableKey`, depending on stream config & operation - member t.ParseStreamRecord(record: Record) : StreamRecord<'TRecord> = + member t.DeserializeStreamRecord(record: Record) : StreamRecord<'TRecord> = let op = match record.EventName with | n when n = OperationType.INSERT -> Insert @@ -442,22 +472,24 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName New = newRec Old = oldRec } - member t.TempReadAllRecords() = - task { - let! streams = listStreams CancellationToken.None - let! stream = describeStream (streams |> List.head) CancellationToken.None - match Shard.oldest stream.Shards with - | Some shard -> - let! iterator = getShardIterator stream.StreamArn ShardIteratorType.TRIM_HORIZON { ShardId = shard.ShardId; SequenceNumber = None } CancellationToken.None - let! records = getRecords iterator maxRecords CancellationToken.None - return records |> fst |> Array.map t.ParseStreamRecord - | None -> - return [||] - } - - - member t.StartReadingAsync(streamArn: string, processRecord: StreamRecord<'TRecord> -> unit, ?startPosition : ReadStreamFrom, ?ct: CancellationToken) = - task { - let processor = startStreamProcessor streamArn (defaultArg startPosition Newest) (fun r -> processRecord (t.ParseStreamRecord r)) (ct |> Option.defaultValue CancellationToken.None) - return () - } + // TODO: Return/save checkpoint updates somehow + /// + /// Start a single-process Dynamo stream reader + /// + /// ARN of the stream to read from + /// Function to process each record + /// Start position of the stream - defaults to `Newest` + /// Cancellation token for the operation - defaults to `CancellationToken.None` + member internal t.StartReadingAsync + ( + streamArn: string, + processRecord: StreamRecord<'TRecord> -> Task, + ?startPosition: ReadStreamFrom, + ?ct: CancellationToken + ) = + startStreamProcessor + streamArn + (defaultArg startPosition Newest) + (fun r -> processRecord (t.DeserializeStreamRecord r)) + (defaultArg ct CancellationToken.None) + :> IDisposable diff --git a/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs index 79d9765..2e3db12 100644 --- a/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs +++ b/tests/FSharp.AWS.DynamoDB.Tests/StreamingTests.fs @@ -9,6 +9,7 @@ open FSharp.AWS.DynamoDB open FSharp.AWS.DynamoDB.Scripting open Amazon.DynamoDBv2 open System.Collections.Concurrent +open System.Threading.Tasks [] module StreamingTests = @@ -28,23 +29,29 @@ type ``Streaming Tests``(fixture: TableFixture) = { HashKey = guid () RangeKey = guid () StringValue = guid () - IntValue = rand () } + IntValue = rand () } - let (table, streams) = fixture.CreateEmptyWithStreaming(StreamViewType.NEW_AND_OLD_IMAGES) + let (table, stream) = fixture.CreateEmptyWithStreaming(StreamViewType.NEW_AND_OLD_IMAGES) [] let ``Parse New Item from stream`` () = - let recordQueue = ConcurrentQueue>() - let streamArn = streams.ListStreamsAsync() |> Async.AwaitTask |> Async.RunSynchronously |> Seq.head - let value = mkItem () - let key = table.PutItem value - let value' = table.GetItem key - let task = streams.StartReadingAsync(streamArn, fun record -> recordQueue.Enqueue record) |> Async.AwaitTask |> Async.RunSynchronously - Async.Sleep 1000 |> Async.RunSynchronously - let records = recordQueue.ToArray() - test <@ 1 = Array.length records @> - test <@ records[0].Operation = Insert @> - test <@ records[0].Old = None @> - test <@ records[0].New = Some value' @> + task { + let recordQueue = ConcurrentQueue>() + let! streams = stream.ListStreamsAsync() + let streamArn = streams |> Seq.head + let value = mkItem () + let key = table.PutItem value + let value' = table.GetItem key + let processRecord r = + recordQueue.Enqueue r + Task.CompletedTask + use disp = stream.StartReadingAsync(streamArn, processRecord) + do! Async.Sleep 1000 + let records = recordQueue.ToArray() + test <@ 1 = Array.length records @> + test <@ records[0].Operation = Insert @> + test <@ records[0].Old = None @> + test <@ records[0].New = Some value' @> + } interface IClassFixture From a542fd5201f63a7279e7068c1d5f9c5ffd757692 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 4 Sep 2024 18:29:35 +0800 Subject: [PATCH 5/7] Removed some experiments & unused code --- src/FSharp.AWS.DynamoDB/StreamContext.fs | 80 ------------------------ 1 file changed, 80 deletions(-) diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs index 943a6f6..f6fb447 100644 --- a/src/FSharp.AWS.DynamoDB/StreamContext.fs +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -6,7 +6,6 @@ open System.Threading.Tasks open Amazon.DynamoDBv2 open Amazon.DynamoDBv2.Model -open System.Collections.Generic type StreamOperation = | Insert @@ -27,82 +26,8 @@ type ReadStreamFrom = | Newest | Positions of StreamPosition list -type private Enumerator<'TRecord>(initialIterator: string, fetchNext: string -> Task<('TRecord[] * string option)>) = - let mutable iterator = initialIterator - let mutable records: 'TRecord[] = Array.empty - let mutable index = 0 - - interface IAsyncEnumerator<'TRecord> with - member _.Current = records[index] - - member _.MoveNextAsync() = - if index < records.Length - 1 then - index <- index + 1 - ValueTask(true) - else - let nextTask = - (fetchNext iterator) - .ContinueWith( - (fun (task: Task<('TRecord[] * string option)>) -> - match task.Result with - | (_, None) -> - records <- Array.empty - index <- 0 - false - | (recs, Some nextIterator) -> - records <- recs - iterator <- nextIterator - index <- 0 - true), - TaskContinuationOptions.OnlyOnRanToCompletion - ) - ValueTask(nextTask) - - member _.DisposeAsync() = ValueTask() - -type private ShardTree = - | Leaf of Shard - | Node of Shard * ShardTree list - -module private ShardTree = - let shard (tree: ShardTree) = - match tree with - | Leaf shard - | Node(shard, _) -> shard - - let find (shardId: string) (tree: ShardTree) = - let rec findInner t = - match t with - | Leaf s - | Node(s, _) when s.ShardId = shardId -> Some(t) - | Node(_, children) -> children |> Seq.choose (findInner) |> Seq.tryHead - | Leaf _ -> None - - findInner tree - - let nextShards (shardId: string) (trees: ShardTree list) = - match trees |> List.choose (find shardId) |> List.tryHead with - | Some(Node(_, children)) -> children - | _ -> [] - module private Shard = - /// - /// Returns all of the root shards (with no parent) in the stream - /// - /// sequence of `Shard` objects from the `StreamDescription` - let roots (shards: Shard seq) = - shards - |> Seq.filter (fun s -> - String.IsNullOrEmpty(s.ParentShardId) - || not (shards |> Seq.exists (fun c -> c.ShardId = s.ParentShardId))) - - /// - /// Returns all of the leaf shards (with no children) in the stream - /// - /// sequence of `Shard` objects from the `StreamDescription` - let leaves (shards: Shard seq) = shards |> Seq.filter (fun s -> not (shards |> Seq.exists (fun c -> c.ParentShardId = s.ShardId))) - /// /// Returns all of the open shards (with no ending sequence number) in the stream /// @@ -328,11 +253,6 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName Some response.NextShardIterator } - let getRecordsAsync (iterator: string) (ct: CancellationToken) = - let batchSize = 5 - { new IAsyncEnumerable with - member _.GetAsyncEnumerator(ct) = Enumerator(iterator, (fun i -> getRecords i batchSize ct)) :> IAsyncEnumerator } - let startShardProcessor (outbox: MailboxProcessor) (streamArn: string) From d2a2b8ff9ab5919c0d539634c754ac86bea4cb18 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 4 Sep 2024 18:43:37 +0800 Subject: [PATCH 6/7] Fixed bug starting shard worker for Newest strategy --- src/FSharp.AWS.DynamoDB/StreamContext.fs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs index f6fb447..a53f5ed 100644 --- a/src/FSharp.AWS.DynamoDB/StreamContext.fs +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -101,13 +101,14 @@ module private StreamState = (s.ShardId, Processing { ShardId = s.ShardId - SequenceNumber = Some s.SequenceNumberRange.EndingSequenceNumber })) + SequenceNumber = None })) |> Map.ofSeq let closedShards = shards |> Seq.filter (fun s -> not (openShards |> Map.containsKey s.ShardId)) |> Seq.map (fun s -> (s.ShardId, Completed)) |> Map.ofSeq + closedShards |> Map.union openShards | Positions pos -> @@ -236,7 +237,8 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName } // TODO: what does this return if no more records in shard? - let getShardIterator (streamArn: string) (iteratorType: ShardIteratorType) (position: StreamPosition) (ct: CancellationToken) = task { + let getShardIterator (streamArn: string) (position: StreamPosition) (ct: CancellationToken) = task { + let iteratorType = if position.SequenceNumber.IsSome then ShardIteratorType.AFTER_SEQUENCE_NUMBER else ShardIteratorType.TRIM_HORIZON let req = GetShardIteratorRequest(StreamArn = streamArn, ShardId = position.ShardId, ShardIteratorType = iteratorType) position.SequenceNumber |> Option.iter (fun n -> req.SequenceNumber <- n) let! response = client.GetShardIteratorAsync(req, ct) @@ -282,7 +284,7 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName async { let! initialIterator = - getShardIterator streamArn ShardIteratorType.TRIM_HORIZON position ct |> Async.AwaitTaskCorrect + getShardIterator streamArn position ct |> Async.AwaitTaskCorrect do! loop initialIterator }), ct From fdcdc93ae928552cb3363f87c1e58533e4ccc38b Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 16 Oct 2024 16:52:57 +0800 Subject: [PATCH 7/7] Swallow deserialisation exceptions on the Old image --- global.json | 2 +- src/FSharp.AWS.DynamoDB/StreamContext.fs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/global.json b/global.json index 0b0870e..34f3aed 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "8.0.302", + "version": "8.0.401", "rollForward": "latestMajor" } } diff --git a/src/FSharp.AWS.DynamoDB/StreamContext.fs b/src/FSharp.AWS.DynamoDB/StreamContext.fs index a53f5ed..d8cce6f 100644 --- a/src/FSharp.AWS.DynamoDB/StreamContext.fs +++ b/src/FSharp.AWS.DynamoDB/StreamContext.fs @@ -370,6 +370,7 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName ///
/// DynamoDB stream record to deserialize. /// `StreamRecord` with old value, new value, and/or `TableKey`, depending on stream config & operation + /// Throws an exception if the New image attributes can’t be deserialized. Old image will be set to `None` if deserialization fails. member t.DeserializeStreamRecord(record: Record) : StreamRecord<'TRecord> = let op = match record.EventName with @@ -387,7 +388,8 @@ type StreamContext<'TRecord> internal (client: IAmazonDynamoDBStreams, tableName if record.Dynamodb.OldImage.Count = 0 then None else - Some(t.Template.OfAttributeValues record.Dynamodb.OldImage) + try Some(t.Template.OfAttributeValues record.Dynamodb.OldImage) + with _ -> None // Swallow exception if old image fails deserialization { Operation = op TableKey = key ApproximateCreationDateTime = DateTimeOffset(record.Dynamodb.ApproximateCreationDateTime)