Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Query: Adds support for continuation using SqlQuerySpec #3774

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
60f7f09
initial change to support sending continuation as resume value in que…
Mar 16, 2023
7aba983
serialize resume value in continuation token.
Mar 18, 2023
732cb05
fix unit test that was failing. other minor refactors
Mar 21, 2023
55dbb82
additional refactoring
Mar 22, 2023
39e83c1
Merge branch 'master' into users/balaperu/query_spec_continuation
Jun 22, 2023
1976690
Merge branch 'master' into users/balaperu/query_spec_continuation
Jul 14, 2023
5eb8726
few minor changes
Jul 14, 2023
13797cb
additional tests
Jul 19, 2023
079196c
Merge branch 'master' into users/balaperu/query_spec_continuation
Jul 19, 2023
09b0704
code refactoring
Aug 2, 2023
d228875
Create separate top level object to represent SqlQueryResumeValue. Di…
Aug 8, 2023
07eb3ac
handle different continuation token for target partition
Aug 9, 2023
b041d42
update test for continuation from array and object values. other mino…
Aug 11, 2023
86dd496
save resume value as CosmosElement instead of defining a separate der…
Aug 24, 2023
05c1a89
support on CosmosNumber64. Other number types are not supported for r…
Aug 25, 2023
c63665f
update cosmosundefinedquerytests to include arrays and object in orde…
Aug 26, 2023
315314d
revert change to have separate SqlQueryResumeValue objects as it is b…
Aug 28, 2023
5144b4e
group resume value into 3 types: Undefined, Primitive and Complex
Aug 29, 2023
b5b9159
minor fix
Sep 5, 2023
b1b3c78
Merge branch 'master' into users/balaperu/query_spec_continuation
Sep 5, 2023
d57b084
Merge branch 'master' into users/balaperu/query_spec_continuation
Jan 26, 2024
6b6655c
Merge branch 'master' into users/balaperu/query_spec_continuation
adityasa Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions Microsoft.Azure.Cosmos/src/CosmosSqlQuerySpecJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ internal CosmosSqlQuerySpecJsonConverter(CosmosSerializer userSerializer)

public override bool CanConvert(Type objectType)
{
return typeof(SqlParameter) == objectType;
return typeof(SqlParameter) == objectType
|| typeof(SqlQueryResumeValue).IsAssignableFrom(objectType);
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
Expand All @@ -34,32 +35,37 @@ public override object ReadJson(JsonReader reader, Type objectType, object exist

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
SqlParameter sqlParameter = (SqlParameter)value;

writer.WriteStartObject();
writer.WritePropertyName("name");
serializer.Serialize(writer, sqlParameter.Name);
writer.WritePropertyName("value");

// if the SqlParameter has stream value we dont pass it through the custom serializer.
if (sqlParameter.Value is SerializedParameterValue serializedEncryptedData)
{
writer.WriteRawValue(serializedEncryptedData.rawSerializedJsonValue);
}
else
if (value is SqlParameter sqlParameter)
{
// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
writer.WriteStartObject();
writer.WritePropertyName("name");
serializer.Serialize(writer, sqlParameter.Name);
writer.WritePropertyName("value");

// if the SqlParameter has stream value we dont pass it through the custom serializer.
if (sqlParameter.Value is SerializedParameterValue serializedEncryptedData)
{
using (StreamReader streamReader = new StreamReader(str))
writer.WriteRawValue(serializedEncryptedData.rawSerializedJsonValue);
}
else
{
// Use the user serializer for the parameter values so custom conversions are correctly handled
using (Stream str = this.UserSerializer.ToStream(sqlParameter.Value))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
using (StreamReader streamReader = new StreamReader(str))
{
string parameterValue = streamReader.ReadToEnd();
writer.WriteRawValue(parameterValue);
}
}
}
}

writer.WriteEndObject();
writer.WriteEndObject();
}
else if (value is SqlQueryResumeValue resumeValue)
{
SqlQueryResumeValue.Serialize(writer, resumeValue, serializer);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
using Microsoft.Azure.Cosmos.CosmosElements.Numbers;
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
using Microsoft.Azure.Documents.Routing;
using Newtonsoft.Json;
Expand Down Expand Up @@ -46,12 +45,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
/// 2. All partitions, that have greater Range.Min than that of the target partition, have exhausted all values less than or equal to X
/// </para>
/// <para>
/// Given this background, below is an example of order by continuation token. The class members below explains the different
/// Given this background, below is an example of orderby continuation token. The class members below explains the different
/// component/states of the continuation token.
/// </para>
/// </summary>
/// <example>
/// Order by continuation token example.
/// OrderBy continuation token example.
/// <![CDATA[
/// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}},"orderByItems"[{"item":2}],"rid":"OpY0AN-mFAACAAAAAAAABA==","skipCount":0,"filter":"r.key > 1"}
/// ]]>
Expand All @@ -62,6 +61,7 @@ private static class PropertyNames
{
public const string CompositeToken = "compositeToken";
public const string OrderByItems = "orderByItems";
public const string ResumeValues = "resumeValues";
public const string Rid = "rid";
public const string SkipCount = "skipCount";
public const string Filter = "filter";
Expand All @@ -71,22 +71,19 @@ private static class PropertyNames
/// Initializes a new instance of the OrderByContinuationToken struct.
/// </summary>
/// <param name="compositeContinuationToken">The composite continuation token (refer to property documentation).</param>
/// <param name="orderByItems">The order by items (refer to property documentation).</param>
/// <param name="orderByItems">The orderby items (refer to property documentation).</param>
/// <param name="resumeValues">OrderBy property values the query needs to resume from in the next round trip</param>
/// <param name="rid">The rid (refer to property documentation).</param>
/// <param name="skipCount">The skip count (refer to property documentation).</param>
/// <param name="filter">The filter (refer to property documentation).</param>
public OrderByContinuationToken(
ParallelContinuationToken compositeContinuationToken,
IReadOnlyList<OrderByItem> orderByItems,
IReadOnlyList<SqlQueryResumeValue> resumeValues,
string rid,
int skipCount,
string filter)
{
if (orderByItems.Count == 0)
{
throw new ArgumentException($"{nameof(orderByItems)} can not be empty.");
}

if (string.IsNullOrWhiteSpace(rid))
{
throw new ArgumentNullException($"{nameof(rid)} can not be null or empty or whitespace.");
Expand All @@ -99,7 +96,30 @@ public OrderByContinuationToken(

//// filter is allowed to be null.
this.ParallelContinuationToken = compositeContinuationToken ?? throw new ArgumentNullException(nameof(compositeContinuationToken));
this.OrderByItems = orderByItems ?? throw new ArgumentNullException(nameof(orderByItems));

if (resumeValues != null)
{
if (resumeValues.Count == 0)
{
throw new ArgumentException($"{nameof(resumeValues)} can not be empty.");
}

this.ResumeValues = resumeValues;
}
adityasa marked this conversation as resolved.
Show resolved Hide resolved
else if (orderByItems != null)
{
if (orderByItems.Count == 0)
{
throw new ArgumentException($"{nameof(orderByItems)} can not be empty.");
}
adityasa marked this conversation as resolved.
Show resolved Hide resolved

this.OrderByItems = orderByItems;
}
else
{
throw new ArgumentException($"Either {nameof(orderByItems)} or {nameof(resumeValues)} needs to be provided.");
}

this.Rid = rid;
this.SkipCount = skipCount;
this.Filter = filter;
Expand Down Expand Up @@ -131,14 +151,33 @@ public ParallelContinuationToken ParallelContinuationToken
/// ]]>
/// </example>
/// <remarks>
/// Right now, we don't support orderBy by multiple fields, so orderByItems is an array of one element.
/// This is an array to support multi item orderby.
/// </remarks>>
[JsonProperty(PropertyNames.OrderByItems)]
public IReadOnlyList<OrderByItem> OrderByItems
{
get;
}

/// <summary>
/// Gets: Values in the top most OrderByQueryResult from the target partition.
/// resumeValues is used for filtering when we resume.
/// </summary>
/// <example>
/// The below array has one value "2" which was the last orderby value returned.
/// <![CDATA[
/// "resumeValues":[2]
/// ]]>
/// </example>
/// <remarks>
/// This is an array to support multi item orderby.
/// </remarks>>
[JsonProperty(PropertyNames.ResumeValues)]
public IReadOnlyList<SqlQueryResumeValue> ResumeValues
{
get;
}

/// <summary>
/// Gets: Rid in the top most OrderByQueryResult from the target partition.
/// Rid is used for filtering after we resume, when orderByItems have the same value.
Expand Down Expand Up @@ -169,7 +208,7 @@ public string Rid
/// online to understand this better.
/// </para>
/// <para>
/// This behavior has implications on how pagination work for CosmosDB queries, especially for order by queries across
/// This behavior has implications on how pagination work for CosmosDB queries, especially for orderby queries across
/// multiple partition.
/// </para>
/// <para>
Expand Down Expand Up @@ -215,26 +254,52 @@ public string Filter
public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByContinuationToken)
{
CosmosElement compositeContinuationToken = ParallelContinuationToken.ToCosmosElement(orderByContinuationToken.ParallelContinuationToken);
List<CosmosElement> orderByItemsRaw = new List<CosmosElement>();
foreach (OrderByItem orderByItem in orderByContinuationToken.OrderByItems)

List<CosmosElement> orderByItemsRaw;
List<CosmosElement> resumeValuesRaw;
if (orderByContinuationToken.OrderByItems != null)
adityasa marked this conversation as resolved.
Show resolved Hide resolved
{
orderByItemsRaw.Add(OrderByItem.ToCosmosElement(orderByItem));
resumeValuesRaw = null;
orderByItemsRaw = new List<CosmosElement>(orderByContinuationToken.OrderByItems.Count);
foreach (OrderByItem orderByItem in orderByContinuationToken.OrderByItems)
{
orderByItemsRaw.Add(OrderByItem.ToCosmosElement(orderByItem));
}
}

CosmosArray orderByItems = CosmosArray.Create(orderByItemsRaw);

CosmosElement filter = orderByContinuationToken.Filter == null ? CosmosNull.Create() : (CosmosElement)CosmosString.Create(orderByContinuationToken.Filter);

CosmosObject cosmosObject = CosmosObject.Create(
new Dictionary<string, CosmosElement>()
else if (orderByContinuationToken.ResumeValues != null)
{
orderByItemsRaw = null;
resumeValuesRaw = new List<CosmosElement>(orderByContinuationToken.ResumeValues.Count);
foreach (SqlQueryResumeValue resumeValue in orderByContinuationToken.ResumeValues)
{
{ PropertyNames.CompositeToken, compositeContinuationToken },
{ PropertyNames.OrderByItems, orderByItems },
{ PropertyNames.Rid, CosmosString.Create(orderByContinuationToken.Rid) },
{ PropertyNames.SkipCount, CosmosNumber64.Create(orderByContinuationToken.SkipCount) },
{ PropertyNames.Filter, filter },
});
resumeValuesRaw.Add(SqlQueryResumeValue.ToCosmosElement(resumeValue));
}
}
else
{
throw new ArgumentException($"Either {nameof(orderByContinuationToken.ResumeValues)} or {nameof(orderByContinuationToken.OrderByItems)} needs to be specified.");
}

CosmosElement filter = orderByContinuationToken.Filter == null ? CosmosNull.Create() : CosmosString.Create(orderByContinuationToken.Filter);

CosmosObject cosmosObject = (resumeValuesRaw == null)
? CosmosObject.Create(
new Dictionary<string, CosmosElement>()
{
{ PropertyNames.CompositeToken, compositeContinuationToken },
{ PropertyNames.OrderByItems, CosmosArray.Create(orderByItemsRaw) },
{ PropertyNames.Rid, CosmosString.Create(orderByContinuationToken.Rid) },
{ PropertyNames.SkipCount, CosmosNumber64.Create(orderByContinuationToken.SkipCount) },
{ PropertyNames.Filter, filter },
})
: CosmosObject.Create(
new Dictionary<string, CosmosElement>()
{
{ PropertyNames.CompositeToken, compositeContinuationToken },
{ PropertyNames.ResumeValues, CosmosArray.Create(resumeValuesRaw) },
{ PropertyNames.Rid, CosmosString.Create(orderByContinuationToken.Rid) },
{ PropertyNames.SkipCount, CosmosNumber64.Create(orderByContinuationToken.SkipCount) },
});
return cosmosObject;
}

Expand All @@ -260,14 +325,29 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm

ParallelContinuationToken compositeContinuationToken = tryCompositeContinuation.Result;

if (!cosmosObject.TryGetValue(PropertyNames.OrderByItems, out CosmosArray orderByItemsRaw))
// Try to get ResumeValues first, if it is not present then try to get orderby items
List<SqlQueryResumeValue> resumeValues;
List<OrderByItem> orderByItems;
if (cosmosObject.TryGetValue(PropertyNames.ResumeValues, out CosmosArray resumeValuesRaw))
{
orderByItems = null;
resumeValues = new List<SqlQueryResumeValue>(resumeValuesRaw.Count);
foreach (CosmosElement resumeValue in resumeValuesRaw)
{
resumeValues.Add(SqlQueryResumeValue.FromCosmosElement(resumeValue));
}
}
else if (cosmosObject.TryGetValue(PropertyNames.OrderByItems, out CosmosArray orderByItemsRaw))
{
resumeValues = null;
orderByItems = orderByItemsRaw.Select(x => OrderByItem.FromCosmosElement(x)).ToList();
}
else
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.OrderByItems}': {cosmosElement}"));
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.OrderByItems}': {cosmosElement}"));
}

List<OrderByItem> orderByItems = orderByItemsRaw.Select(x => OrderByItem.FromCosmosElement(x)).ToList();

if (!cosmosObject.TryGetValue(PropertyNames.Rid, out CosmosString ridRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
Expand All @@ -284,16 +364,24 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm

int skipCount = (int)Number64.ToLong(skipCountRaw.GetValue());

if (!cosmosObject.TryGetValue(PropertyNames.Filter, out CosmosElement filterRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.Filter}': {cosmosElement}"));
}

// filter will be present only when orderByItems is present. This property is not used for resumeValue base continuation
string filter;
if (filterRaw is CosmosString filterStringRaw)
adityasa marked this conversation as resolved.
Show resolved Hide resolved
if (orderByItems != null)
{
filter = filterStringRaw.Value;
if (!cosmosObject.TryGetValue(PropertyNames.Filter, out CosmosElement filterRaw))
{
return TryCatch<OrderByContinuationToken>.FromException(
new MalformedContinuationTokenException($"{nameof(OrderByContinuationToken)} is missing field: '{PropertyNames.Filter}': {cosmosElement}"));
}

if (filterRaw is CosmosString filterStringRaw)
{
filter = filterStringRaw.Value;
}
else
{
filter = null;
}
}
else
{
Expand All @@ -303,6 +391,7 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
compositeContinuationToken,
orderByItems,
resumeValues,
rid,
skipCount,
filter);
Expand Down
Loading
Loading