Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams Support #80

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft

Streams Support #80

wants to merge 9 commits into from

Conversation

samritchie
Copy link
Collaborator

Addresses #64. I’ve been sitting on this branch for a while but really should just focus on shipping something.

There are two main components here:

  1. StreamContext<'TRecord>.DeserializeStreamRecord()
    This is a fairly straightforward public interface to allow deserialisation of stream data via the record template so that the same F# type can be used in stream processing. It should add value as-is for Lambda & KCL users, and I’d suggest we release once happy with the interface.
    Main thing I’m unsure about is handling deserialisation exceptions (eg on schema change) - TableContext bubbles these to the consumer, so it’s probably a reasonable position to take.
  2. StreamContext<'TRecord>.StartReadingAsync()
    This is a single process stream consumer I added to support testing, but it’s something I would use when it’s ready. It’s currently missing exception handling, checkpoint saving, and I’m fairly vague on how cancellation, IDisposable etc should be managed with MBPs. dynamodb-local does not have multiple shards as far as I’m aware so it’s never actually been run on a multi-shard stream and is almost certainly full of bugs. At this stage looking for some initial feedback, once DeserializeStreamRecord has been released I will run a copy of this code against some real-world tables and monitor.

Longer term we could look at implementing KCL compatible shard-leasing logic via dynamo, the idea of running the KCL just makes me feel dirty.

I’ve bumped FSharp.Core version requirement to 6.0.7 for task support, I’d started implementing manual task continuations and nope. Streams API will be task-based and should set the blueprint for converting the rest.

@samritchie samritchie requested a review from bartelink September 4, 2024 03:15
@samritchie samritchie linked an issue Sep 4, 2024 that may be closed by this pull request
@purkhusid
Copy link
Contributor

This looks good to me, I think it might be worth splitting this up into just the StreamContext changes and then add the stream listening in a followup since the stream context can be used without the subscriptions API being ready.

@samritchie
Copy link
Collaborator Author

@purkhusid yes, that’s the goal - StartReadingAsync() is internal and only used for testing currently. I think there’s benefit to getting the record deserialisation out.

Probably the only thing I’m undecided about is the StreamRecord exception handling - there may be value in swallowing the exception & returning None when deserialising the old image

@purkhusid
Copy link
Contributor

I would prefer that a None would be returned or a Result with an explicit error to handle. There can be cases where you are emitting both old and new but you have specific handling for the old one.

It might even make sense to have a separate context for records when using old and new where you provide 2 types. This way you could use different types during e.g. an migration.

@samritchie
Copy link
Collaborator Author

I’ve changed this to None (for the Old image) - the majority of streams consumers aren’t going to look at the old image, and we don’t have any built-in versioning support for scenarios where comparing images is required AND old versions need to be supported.

However I’ve just noticed the Lambda dotnet SDK has changed a few months back and no longer vends the DynamoDBv2 type. Sigh. I’ll probably need to park this for now & come up with a different approach.

/// <param name="client">DynamoDB client instance.</param>
/// <param name="tableName">Table name to target.</param>
/// <param name="throughput">Throughput to configure if the table does not yet exist.</param>
/// <param name="streaming">Streaming configuration applied if the table does not yet exist.</param>
Copy link
Member

@bartelink bartelink Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making this an optional argument on the other one given that this they are doing the same semantic operation (yes, doing that is a binary breaking change, but life would go on!)
And comment /// <param name="streaming">Streaming configuration to apply if the table does not yet exist.</param>
Alternately, L1577 needs to refer to 'overloads' ;)

match shards |> Seq.tryFind (fun s -> s.ShardId = shardId) with
| None -> []
| Some s ->
match s.ParentShardId with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if string.IsNullOrEmpty ?

type private ShardMsg =
| UpdatedShardPosition of StreamPosition
| EndOfShard of string
| ShardsUpdated of Shard seq
Copy link
Member

@bartelink bartelink Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unless there's a critical benefit to this being lazy, make it a readonlycollection or array?

let! response = client.GetRecordsAsync(GetRecordsRequest(Limit = limit, ShardIterator = iterator), ct)
return
response.Records |> Array.ofSeq,
if isNull response.NextShardIterator then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option.ofObj

})

/// DynamoDB streaming client instance used for the table operations
member _.Client = client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as these don't mutate, they shoudl be member val

/// <param name="tableName">Table name to target.</param>
new(client: IAmazonDynamoDBStreams, tableName: string) =
if not <| isValidTableName tableName then
invalidArg "tableName" "unsupported DynamoDB table name."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
invalidArg "tableName" "unsupported DynamoDB table name."
invalidArg (nameof tableName) "unsupported DynamoDB table name."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support DynamoDB streams
3 participants