Skip to content

Commit

Permalink
save resume value as CosmosElement instead of defining a separate der…
Browse files Browse the repository at this point in the history
…ived objects.

update cosmosundefinedquerytests. other code review comments.
  • Loading branch information
Bala Perumalswamy committed Aug 24, 2023
1 parent b041d42 commit 86dd496
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy
/// 2. All partitions, that have greater Range.Min than that of the target partition, have exhausted all values less than or equal to X
/// </para>
/// <para>
/// Given this background, below is an example of order by continuation token. The class members below explains the different
/// Given this background, below is an example of orderby continuation token. The class members below explains the different
/// component/states of the continuation token.
/// </para>
/// </summary>
/// <example>
/// Order by continuation token example.
/// OrderBy continuation token example.
/// <![CDATA[
/// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}},"orderByItems"[{"item":2}],"rid":"OpY0AN-mFAACAAAAAAAABA==","skipCount":0,"filter":"r.key > 1"}
/// ]]>
Expand All @@ -71,8 +71,8 @@ private static class PropertyNames
/// Initializes a new instance of the OrderByContinuationToken struct.
/// </summary>
/// <param name="compositeContinuationToken">The composite continuation token (refer to property documentation).</param>
/// <param name="orderByItems">The order by items (refer to property documentation).</param>
/// <param name="resumeValues">Order By property values the query needs to resume from in the next round trip</param>
/// <param name="orderByItems">The orderby items (refer to property documentation).</param>
/// <param name="resumeValues">OrderBy property values the query needs to resume from in the next round trip</param>
/// <param name="rid">The rid (refer to property documentation).</param>
/// <param name="skipCount">The skip count (refer to property documentation).</param>
/// <param name="filter">The filter (refer to property documentation).</param>
Expand All @@ -97,21 +97,28 @@ public OrderByContinuationToken(
//// filter is allowed to be null.
this.ParallelContinuationToken = compositeContinuationToken ?? throw new ArgumentNullException(nameof(compositeContinuationToken));

if ((resumeValues != null) && (resumeValues.Count > 0))
if (resumeValues != null)
{
if (resumeValues.Count == 0)
{
throw new ArgumentException($"{nameof(resumeValues)} can not be empty.");
}

this.ResumeValues = resumeValues;
this.OrderByItems = null;
}
else
else if (orderByItems != null)
{
if ((orderByItems == null) || (orderByItems.Count == 0))
if (orderByItems.Count == 0)
{
throw new ArgumentException($"{nameof(orderByItems)} can not be empty.");
}

this.ResumeValues = null;
this.OrderByItems = orderByItems;
}
else
{
throw new ArgumentException($"Either {nameof(orderByItems)} or {nameof(resumeValues)} needs to be provided.");
}

this.Rid = rid;
this.SkipCount = skipCount;
Expand Down Expand Up @@ -144,7 +151,7 @@ public ParallelContinuationToken ParallelContinuationToken
/// ]]>
/// </example>
/// <remarks>
/// This is an array to support multi item order by.
/// This is an array to support multi item orderby.
/// </remarks>>
[JsonProperty(PropertyNames.OrderByItems)]
public IReadOnlyList<OrderByItem> OrderByItems
Expand All @@ -157,13 +164,13 @@ public IReadOnlyList<OrderByItem> OrderByItems
/// resumeValues is used for filtering when we resume.
/// </summary>
/// <example>
/// The below array has one value "2" which was the last order by value returned.
/// The below array has one value "2" which was the last orderby value returned.
/// <![CDATA[
/// "resumeValues":[2]
/// ]]>
/// </example>
/// <remarks>
/// This is an array to support multi item order by.
/// This is an array to support multi item orderby.
/// </remarks>>
[JsonProperty(PropertyNames.ResumeValues)]
public IReadOnlyList<SqlQueryResumeValue> ResumeValues
Expand Down Expand Up @@ -201,7 +208,7 @@ public string Rid
/// online to understand this better.
/// </para>
/// <para>
/// This behavior has implications on how pagination work for CosmosDB queries, especially for order by queries across
/// This behavior has implications on how pagination work for CosmosDB queries, especially for orderby queries across
/// multiple partition.
/// </para>
/// <para>
Expand Down Expand Up @@ -247,27 +254,33 @@ public string Filter
public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByContinuationToken)
{
CosmosElement compositeContinuationToken = ParallelContinuationToken.ToCosmosElement(orderByContinuationToken.ParallelContinuationToken);
List<CosmosElement> orderByItemsRaw = new List<CosmosElement>();

List<CosmosElement> orderByItemsRaw;
List<CosmosElement> resumeValuesRaw;
if (orderByContinuationToken.OrderByItems != null)
{
orderByItemsRaw = new List<CosmosElement>(orderByContinuationToken.OrderByItems.Count);
foreach (OrderByItem orderByItem in orderByContinuationToken.OrderByItems)
{
orderByItemsRaw.Add(OrderByItem.ToCosmosElement(orderByItem));
}
}

List<CosmosElement> resumeValuesRaw;
if (orderByContinuationToken.ResumeValues != null)
resumeValuesRaw = null;
}
else
{
if (orderByContinuationToken.ResumeValues == null)
{
throw new ArgumentException($"Either {nameof(orderByContinuationToken.ResumeValues)} or {nameof(orderByContinuationToken.OrderByItems)} needs to be specified.");
}

resumeValuesRaw = new List<CosmosElement>(orderByContinuationToken.ResumeValues.Count);
foreach (SqlQueryResumeValue resumeValue in orderByContinuationToken.ResumeValues)
{
resumeValuesRaw.Add(SqlQueryResumeValue.ToCosmosElement(resumeValue));
}
}
else
{
resumeValuesRaw = null;

orderByItemsRaw = null;
}

CosmosElement filter = orderByContinuationToken.Filter == null ? CosmosNull.Create() : CosmosString.Create(orderByContinuationToken.Filter);
Expand Down Expand Up @@ -315,7 +328,7 @@ public static TryCatch<OrderByContinuationToken> TryCreateFromCosmosElement(Cosm

ParallelContinuationToken compositeContinuationToken = tryCompositeContinuation.Result;

// Try to get ResumeValues first, if it is not present then try to get order by items
// Try to get ResumeValues first, if it is not present then try to get orderby items
List<SqlQueryResumeValue> resumeValues;
if (cosmosObject.TryGetValue(PropertyNames.ResumeValues, out CosmosArray resumeValuesRaw))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
this.uninitializedEnumeratorsAndTokens.Enqueue((currentEnumerator, (OrderByContinuationToken)null));

// Use the token for the next page, since we fully drained the enumerator.
OrderByContinuationToken orderByContinuationToken = CreateContinuationToken(
OrderByContinuationToken orderByContinuationToken = CreateOrderByContinuationToken(
new ParallelContinuationToken(
token: ((CosmosString)currentEnumerator.FeedRangeState.State.Value).Value,
range: ((FeedRangeEpk)currentEnumerator.FeedRangeState.FeedRange).Range),
Expand Down Expand Up @@ -455,7 +455,7 @@ private ValueTask<bool> MoveNextAsync_DrainPageAsync(ITrace trace)
}
else
{
OrderByContinuationToken orderByContinuationToken = CreateContinuationToken(
OrderByContinuationToken orderByContinuationToken = CreateOrderByContinuationToken(
new ParallelContinuationToken(
token: currentEnumerator.StartOfPageState != null ? ((CosmosString)currentEnumerator.StartOfPageState.Value).Value : null,
range: ((FeedRangeEpk)currentEnumerator.FeedRangeState.FeedRange).Range),
Expand Down Expand Up @@ -877,9 +877,7 @@ private static TryCatch<List<OrderByContinuationToken>> MonadicExtractOrderByTok

foreach (OrderByContinuationToken suppliedOrderByContinuationToken in orderByContinuationTokens)
{
int orderByCount = suppliedOrderByContinuationToken.ResumeValues != null ?
suppliedOrderByContinuationToken.ResumeValues.Count : suppliedOrderByContinuationToken.OrderByItems.Count;

int orderByCount = GetOrderByItemCount(suppliedOrderByContinuationToken);
if (orderByCount != numOrderByColumns)
{
return TryCatch<List<OrderByContinuationToken>>.FromException(
Expand All @@ -891,6 +889,12 @@ private static TryCatch<List<OrderByContinuationToken>> MonadicExtractOrderByTok
return TryCatch<List<OrderByContinuationToken>>.FromResult(orderByContinuationTokens);
}

private static int GetOrderByItemCount(OrderByContinuationToken orderByContinuationToken)
{
return orderByContinuationToken.ResumeValues != null ?
orderByContinuationToken.ResumeValues.Count : orderByContinuationToken.OrderByItems.Count;
}

private static void AppendToBuilders((StringBuilder leftFilter, StringBuilder targetFilter, StringBuilder rightFilter) builders, object str)
{
OrderByCrossPartitionQueryPipelineStage.AppendToBuilders(builders, str, str, str);
Expand Down Expand Up @@ -1143,7 +1147,7 @@ private static (string leftFilter, string targetFilter, string rightFilter) GetF
return (left.ToString(), target.ToString(), right.ToString());
}

private static OrderByContinuationToken CreateContinuationToken(
private static OrderByContinuationToken CreateOrderByContinuationToken(
ParallelContinuationToken parallelToken,
OrderByQueryResult orderByQueryResult,
int skipCount,
Expand Down
Loading

0 comments on commit 86dd496

Please sign in to comment.