Skip to content

Commit

Permalink
AsyncSeq implementation for DynamoDB
Browse files Browse the repository at this point in the history
  • Loading branch information
rtkelly13 committed Jan 12, 2019
1 parent 2d4c921 commit dda79bf
Show file tree
Hide file tree
Showing 8 changed files with 308 additions and 240 deletions.
175 changes: 141 additions & 34 deletions .paket/Paket.Restore.targets

Large diffs are not rendered by default.

9 changes: 6 additions & 3 deletions paket.dependencies
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
framework: netstandard2.0, netcoreapp2.0
redirects: force
lowest_matching: true
source https://www.nuget.org/api/v2


nuget Unquote ~> 4.0.0
nuget AWSSDK.DynamoDBv2 ~> 3.3.5
nuget FSharp.Core ~> 4.2.3 redirects:force
nuget FSharp.Core ~> 4.3 >= 4.3.2 redirects:force
nuget FSharp.Control.AsyncSeq ~> 2.0 >= 2.0.21
nuget Expecto ~> 5.1.2
nuget Expecto.FsCheck ~> 5.1.2
nuget Expecto.VisualStudio.TestAdapter version_in_path: true
nuget FsChec
nuget FsCheck
github eiriktsarpalis/TypeShape:2.10 src/TypeShape/TypeShape.fs

group Build
source https://www.nuget.org/api/v2

nuget FAKE
nuget SourceLink.Fake
nuget Nuget.CommandLine
Expand Down
208 changes: 51 additions & 157 deletions paket.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/FSharp.AWS.DynamoDB/FSharp.AWS.DynamoDB.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
<Compile Include="Expression\ExpressionContainers.fs" />
<Compile Include="RecordTemplate.fs" />
<Compile Include="TableContext.fs" />
<Compile Include="TableContextAsyncSeq.fs" />
<Compile Include="Extensions.fs" />
<None Include="Script.fsx" />
<None Include="paket.references" />
Expand Down
91 changes: 91 additions & 0 deletions src/FSharp.AWS.DynamoDB/TableContextAsyncSeq.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
namespace FSharp.AWS.DynamoDB

open Amazon.DynamoDBv2.Model
//open FSharp.AWS.DynamoDB
open FSharp.AWS.DynamoDB.ExprCommon
open FSharp.Control
open Microsoft.FSharp.Quotations
open System.Net

module AsyncSeq =
let private scanAsyncSeq (tableContext : TableContext<'a>)
(filterCondition : ConditionalExpr.ConditionalExpression option)
(projectionExpr : ProjectionExpr.ProjectionExpr option) (consistentRead : bool option) =
asyncSeq {
let rec aux last =
asyncSeq {
let request = new ScanRequest(tableContext.TableName)
let writer =
new AttributeWriter(request.ExpressionAttributeNames, request.ExpressionAttributeValues)
match filterCondition with
| None -> ()
| Some fc -> request.FilterExpression <- fc.Write writer
match projectionExpr with
| None -> ()
| Some pe -> request.ProjectionExpression <- pe.Write writer
consistentRead |> Option.iter (fun cr -> request.ConsistentRead <- cr)
last |> Option.iter (fun l -> request.ExclusiveStartKey <- l)
let! ct = Async.CancellationToken
let! response = tableContext.Client.ScanAsync(request, ct) |> Async.AwaitTaskCorrect
if response.HttpStatusCode <> HttpStatusCode.OK then
failwithf "Query request returned error %O" response.HttpStatusCode
for item in response.Items do
yield item
if response.LastEvaluatedKey.Count > 0 then yield! aux (Some response.LastEvaluatedKey)
}
yield! aux None
}

type TableContext<'a> with

/// <summary>
/// Asynchronously scans table with given condition expressions.
/// </summary>
/// <param name="filterCondition">Filter condition expression.</param>
/// <param name="limit">Maximum number of items to evaluate.</param>
/// <param name="consistentRead">Specify whether to perform consistent read operation.</param>
member __.ScanAsyncSeq(?filterCondition : ConditionExpression<'TRecord>, ?consistentRead : bool) : AsyncSeq<'a> =
let filterCondition = filterCondition |> Option.map (fun fc -> fc.Conditional)
scanAsyncSeq __ filterCondition None consistentRead
|> AsyncSeq.map __.Template.OfAttributeValues

/// <summary>
/// Asynchronously scans table with given condition expressions.
/// </summary>
/// <param name="filterCondition">Filter condition expression.</param>
/// <param name="limit">Maximum number of items to evaluate.</param>
/// <param name="consistentRead">Specify whether to perform consistent read operation.</param>
member __.ScanAsyncSeq(filterExpr : Expr<'a -> bool>, ?consistentRead : bool) : AsyncSeq<'a> =
let cond = __.Template.PrecomputeConditionalExpr filterExpr
__.ScanAsyncSeq(cond, ?consistentRead = consistentRead)

/// <summary>
/// Asynchronously scans table with given condition expressions.
/// Uses supplied projection expression to narrow downloaded attributes.
/// Projection type must be a tuple of zero or more non-conflicting properties.
/// </summary>
/// <param name="projection">Projection expression.</param>
/// <param name="filterCondition">Filter condition expression.</param>
/// <param name="limit">Maximum number of items to evaluate.</param>
/// <param name="consistentRead">Specify whether to perform consistent read operation.</param>
member __.ScanProjectedAsyncSeq(projection : ProjectionExpression<'TRecord, 'TProjection>,
?filterCondition : ConditionExpression<'a>, ?consistentRead : bool) : AsyncSeq<'TProjection> =
let filterCondition = filterCondition |> Option.map (fun fc -> fc.Conditional)
scanAsyncSeq __ filterCondition (Some projection.ProjectionExpr) consistentRead |> AsyncSeq.map projection.UnPickle

/// <summary>
/// Asynchronously scans table with given condition expressions.
/// Uses supplied projection expression to narrow downloaded attributes.
/// Projection type must be a tuple of zero or more non-conflicting properties.
/// </summary>
/// <param name="projection">Projection expression.</param>
/// <param name="filterCondition">Filter condition expression.</param>
/// <param name="limit">Maximum number of items to evaluate.</param>
/// <param name="consistentRead">Specify whether to perform consistent read operation.</param>
member __.ScanProjectedAsyncSeq<'TProjection>(projection : Expr<'a -> 'TProjection>,
?filterCondition : Expr<'a -> bool>, ?consistentRead : bool) : AsyncSeq<'TProjection> =
let filterCondition =
filterCondition |> Option.map (fun fc -> __.Template.PrecomputeConditionalExpr fc)
__.ScanProjectedAsyncSeq
(__.Template.PrecomputeProjectionExpr projection, ?filterCondition = filterCondition,
?consistentRead = consistentRead)
1 change: 1 addition & 0 deletions src/FSharp.AWS.DynamoDB/paket.references
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Unquote
AWSSDK.DynamoDBv2
FSharp.Control.AsyncSeq
File: TypeShape.fs TypeShape
62 changes: 16 additions & 46 deletions tests/FSharp.AWS.DynamoDB.Tests/App.config
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@
<assemblyIdentity name="xunit.extensions" publicKeyToken="8d05b1bb7a6fdb6c" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="1.9.0.1566" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Linq.Queryable" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading.Tasks.Parallel" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading.ThreadPool" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.11.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="AWSSDK.Core" publicKeyToken="885c28607f98e604" culture="neutral" />
Expand All @@ -44,7 +59,7 @@
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="FSharp.Core" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.4.1.0" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.4.3.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
Expand Down Expand Up @@ -96,41 +111,11 @@
<assemblyIdentity name="System.Linq" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.1.1.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Linq.Expressions" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.1.1.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Linq.Queryable" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Net.WebHeaderCollection" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.ObjectModel" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.13.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Reflection.Emit" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Reflection.Emit.ILGeneration" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Reflection.Emit.Lightweight" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Reflection.TypeExtensions" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
Expand Down Expand Up @@ -161,30 +146,15 @@
<assemblyIdentity name="System.Security.Cryptography.ProtectedData" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Text.RegularExpressions" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.1.1.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.12.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading.Tasks.Parallel" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.2.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading.Thread" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.1.0" />
</dependentAssembly>
<dependentAssembly>
<Paket>True</Paket>
<assemblyIdentity name="System.Threading.ThreadPool" publicKeyToken="b03f5f7f11d50a3a" culture="neutral" />
<bindingRedirect oldVersion="0.0.0.0-65535.65535.65535.65535" newVersion="4.0.11.0" />
</dependentAssembly>
</assemblyBinding></runtime>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
<OutputType>Exe</OutputType>
<OutputPath>..\..\bin\</OutputPath>
<IsPackable>false</IsPackable>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<ItemGroup>
<Compile Include="Utils.fs" />
Expand Down

0 comments on commit dda79bf

Please sign in to comment.