Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine iterator code #1180

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .attach_pid4051998
Empty file.
3 changes: 2 additions & 1 deletion src/main/java/io/milvus/orm/iterator/IteratorAdapterV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public static QueryIteratorParam convertV2Req(QueryIteratorReq queryIteratorReq)
.withOffset(queryIteratorReq.getOffset())
.withLimit(queryIteratorReq.getLimit())
.withIgnoreGrowing(queryIteratorReq.isIgnoreGrowing())
.withBatchSize(queryIteratorReq.getBatchSize());
.withBatchSize(queryIteratorReq.getBatchSize())
.withReduceStopForBest(queryIteratorReq.isReduceStopForBest());

if (queryIteratorReq.getConsistencyLevel() != null) {
builder.withConsistencyLevel(ConsistencyLevelEnum.valueOf(queryIteratorReq.getConsistencyLevel().name()));
Expand Down
39 changes: 24 additions & 15 deletions src/main/java/io/milvus/orm/iterator/QueryIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

package io.milvus.orm.iterator;

import io.milvus.grpc.DataType;
import io.milvus.grpc.MilvusServiceGrpc;
import io.milvus.grpc.QueryRequest;
import io.milvus.grpc.QueryResults;
import io.milvus.grpc.*;
import io.milvus.param.Constant;
import io.milvus.param.ParamUtils;
import io.milvus.param.collection.FieldType;
import io.milvus.param.dml.QueryIteratorParam;
Expand Down Expand Up @@ -98,7 +96,7 @@ public QueryIterator(QueryIteratorReq queryIteratorReq,
// perform a query to get the first time stamp check point
// the time stamp will be input for the next query to skip something
private void setupTsByRequest() {
QueryResults response = getQueryResultsWrapper(expr, 0L, 1L, 0L);
QueryResults response = executeQuery(expr, 0L, 1L, 0L);
if (response.getSessionTs() <= 0) {
logger.warn("Failed to get mvccTs from milvus server, use client-side ts instead");
// fall back to latest session ts by local time
Expand All @@ -116,7 +114,7 @@ private void seek() {
return;
}

QueryResults response = getQueryResultsWrapper(expr, 0L, offset, this.sessionTs);
QueryResults response = executeQuery(expr, 0L, offset, this.sessionTs);
QueryResultsWrapper queryWrapper = new QueryResultsWrapper(response);
List<QueryResultsWrapper.RowRecord> res = queryWrapper.getRowRecords();
int resultIndex = Math.min(res.size(), (int) offset);
Expand All @@ -135,7 +133,7 @@ public List<QueryResultsWrapper.RowRecord> next() {
iteratorCache.releaseCache(cacheIdInUse);
String currentExpr = setupNextExpr();
logger.debug("Query iterator next expression: " + currentExpr);
QueryResults response = getQueryResultsWrapper(currentExpr, offset, batchSize, this.sessionTs);
QueryResults response = executeQuery(currentExpr, offset, batchSize, this.sessionTs);
QueryResultsWrapper queryWrapper = new QueryResultsWrapper(response);
List<QueryResultsWrapper.RowRecord> res = queryWrapper.getRowRecords();
maybeCache(res);
Expand Down Expand Up @@ -199,7 +197,7 @@ private boolean isResSufficient(List<QueryResultsWrapper.RowRecord> ret) {
return ret != null && ret.size() >= batchSize;
}

private QueryResults getQueryResultsWrapper(String expr, long offset, long limit, long ts) {
private QueryResults executeQuery(String expr, long offset, long limit, long ts) {
QueryParam queryParam = QueryParam.newBuilder()
.withDatabaseName(queryIteratorParam.getDatabaseName())
.withCollectionName(queryIteratorParam.getCollectionName())
Expand All @@ -210,20 +208,31 @@ private QueryResults getQueryResultsWrapper(String expr, long offset, long limit
.withOffset(offset)
.withLimit(limit)
.withIgnoreGrowing(queryIteratorParam.isIgnoreGrowing())
.withReduceStopForBest(queryIteratorParam.isReduceStopForBest())
.withIterator(Boolean.TRUE)
.build();

QueryRequest queryRequest = ParamUtils.convertQueryParam(queryParam);
QueryRequest.Builder builder = queryRequest.toBuilder();
// reduce stop for best
builder.addQueryParams(KeyValuePair.newBuilder()
.setKey(Constant.REDUCE_STOP_FOR_BEST)
.setValue(String.valueOf(queryIteratorParam.isReduceStopForBest()))
.build());

// iterator
builder.addQueryParams(KeyValuePair.newBuilder()
.setKey(Constant.ITERATOR_FIELD)
.setValue(String.valueOf(Boolean.TRUE))
.build());

// pass the session ts to query interface
if (ts > 0) {
queryRequest = queryRequest.toBuilder().setGuaranteeTimestamp(ts).build();
}
QueryResults response = blockingStub.query(queryRequest);
builder.setGuaranteeTimestamp(ts).build();

// set default consistency level
builder.setUseDefaultConsistency(true);

QueryResults response = blockingStub.query(builder.build());
String title = String.format("QueryRequest collectionName:%s", queryIteratorParam.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());

return response;
}
}
29 changes: 18 additions & 11 deletions src/main/java/io/milvus/orm/iterator/SearchIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.milvus.common.utils.JsonUtils;
import io.milvus.exception.ParamException;
import io.milvus.grpc.*;
import io.milvus.param.Constant;
import io.milvus.param.MetricType;
import io.milvus.param.ParamUtils;
import io.milvus.param.collection.FieldType;
Expand Down Expand Up @@ -172,7 +173,7 @@ private void checkRmRangeSearchParameters() {
}

private void initSearchIterator() {
SearchResults response = executeNextSearch(params, expr, false, 0L);
SearchResults response = executeSearch(params, expr, false, 0L);
if (response.getSessionTs() <= 0) {
logger.warn("Failed to get mvccTs from milvus server, use client-side ts instead");
// fall back to latest session ts by local time
Expand Down Expand Up @@ -245,7 +246,7 @@ private void updateFilteredIds(SearchResultsWrapper searchResultsWrapper) {
}
}

private SearchResults executeNextSearch(Map<String, Object> params, String nextExpr, boolean toExtendBatch, long ts) {
private SearchResults executeSearch(Map<String, Object> params, String nextExpr, boolean toExtendBatch, long ts) {
SearchParam.Builder searchParamBuilder = SearchParam.newBuilder()
.withDatabaseName(searchIteratorParam.getDatabaseName())
.withCollectionName(searchIteratorParam.getCollectionName())
Expand All @@ -258,25 +259,31 @@ private SearchResults executeNextSearch(Map<String, Object> params, String nextE
.withRoundDecimal(searchIteratorParam.getRoundDecimal())
.withParams(JsonUtils.toJson(params))
.withMetricType(MetricType.valueOf(searchIteratorParam.getMetricType()))
.withIgnoreGrowing(searchIteratorParam.isIgnoreGrowing())
.withIterator(Boolean.TRUE)
;
.withIgnoreGrowing(searchIteratorParam.isIgnoreGrowing());

if (!StringUtils.isNullOrEmpty(searchIteratorParam.getGroupByFieldName())) {
searchParamBuilder.withGroupByFieldName(searchIteratorParam.getGroupByFieldName());
}
fillVectorsByPlType(searchParamBuilder);

SearchRequest searchRequest = ParamUtils.convertSearchParam(searchParamBuilder.build());
SearchRequest.Builder builder = searchRequest.toBuilder();
// iterator
builder.addSearchParams(
KeyValuePair.newBuilder()
.setKey(Constant.ITERATOR_FIELD)
.setValue(String.valueOf(Boolean.TRUE))
.build());

// pass the session ts to search interface
if (ts > 0) {
searchRequest = searchRequest.toBuilder().setGuaranteeTimestamp(ts).build();
}
SearchResults response = blockingStub.search(searchRequest);
builder.setGuaranteeTimestamp(ts).build();

// set default consistency level
builder.setUseDefaultConsistency(true);

SearchResults response = blockingStub.search(builder.build());
String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName());
rpcUtils.handleResponse(title, response.getStatus());

return response;
}

Expand Down Expand Up @@ -388,7 +395,7 @@ private List<QueryResultsWrapper.RowRecord> trySearchFill() {
while (true) {
Map<String, Object> nextParams = nextParams(coefficient);
String nextExpr = filteredDuplicatedResultExpr(expr);
SearchResults response = executeNextSearch(nextParams, nextExpr, true, this.sessionTs);
SearchResults response = executeSearch(nextParams, nextExpr, true, this.sessionTs);
SearchResultsWrapper searchResultsWrapper = new SearchResultsWrapper(response.getResults());
updateFilteredIds(searchResultsWrapper);
List<QueryResultsWrapper.RowRecord> newPage = searchResultsWrapper.getRowRecords(0);
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/io/milvus/param/ParamUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -803,11 +803,6 @@ public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam
KeyValuePair.newBuilder()
.setKey(Constant.IGNORE_GROWING)
.setValue(String.valueOf(requestParam.isIgnoreGrowing()))
.build())
.addSearchParams(
KeyValuePair.newBuilder()
.setKey(Constant.ITERATOR_FIELD)
.setValue(String.valueOf(requestParam.isIterator()))
.build());

if (!Objects.equals(requestParam.getMetricType(), MetricType.None.name())) {
Expand Down Expand Up @@ -1003,12 +998,6 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch
public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) {
boolean useDefaultConsistency = (requestParam.getConsistencyLevel() == null);
long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName());
// special logic for iterator
// don't pass guaranteeTimestamp for iterator, the query() interface might return empty list.
if (requestParam.isIterator()) {
useDefaultConsistency = true;
guaranteeTimestamp = 0L;
}
QueryRequest.Builder builder = QueryRequest.newBuilder()
.setCollectionName(requestParam.getCollectionName())
.addAllPartitionNames(requestParam.getPartitionNames())
Expand Down Expand Up @@ -1052,18 +1041,6 @@ public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) {
.setValue(String.valueOf(requestParam.isIgnoreGrowing()))
.build());

// reduce stop for best
builder.addQueryParams(KeyValuePair.newBuilder()
.setKey(Constant.REDUCE_STOP_FOR_BEST)
.setValue(String.valueOf(requestParam.isReduceStopForBest()))
.build());

// iterator
builder.addQueryParams(KeyValuePair.newBuilder()
.setKey(Constant.ITERATOR_FIELD)
.setValue(String.valueOf(requestParam.isIterator()))
.build());

return builder.build();
}

Expand Down
28 changes: 0 additions & 28 deletions src/main/java/io/milvus/param/dml/QueryParam.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public class QueryParam {
private final long offset;
private final long limit;
private final boolean ignoreGrowing;
private final boolean reduceStopForBest;
private final boolean iterator;

private QueryParam(@NonNull Builder builder) {
this.databaseName = builder.databaseName;
Expand All @@ -65,8 +63,6 @@ private QueryParam(@NonNull Builder builder) {
this.offset = builder.offset;
this.limit = builder.limit;
this.ignoreGrowing = builder.ignoreGrowing;
this.reduceStopForBest = builder.reduceStopForBest;
this.iterator = builder.iterator;
}

public static Builder newBuilder() {
Expand All @@ -89,8 +85,6 @@ public static class Builder {
private Long offset = 0L;
private Long limit = 0L;
private Boolean ignoreGrowing = Boolean.FALSE;
private Boolean reduceStopForBest = Boolean.FALSE;
private Boolean iterator = Boolean.FALSE;

private Builder() {
}
Expand Down Expand Up @@ -224,28 +218,6 @@ public Builder withIgnoreGrowing(@NonNull Boolean ignoreGrowing) {
return this;
}

/**
* Adjust the query using iterators to handle offsets more efficiently during the Reduce step. Default is False.
*
* @param reduceStopForBest <code>Boolean.TRUE</code> ignore, Boolean.FALSE is not
* @return <code>Builder</code>
*/
public Builder withReduceStopForBest(@NonNull Boolean reduceStopForBest) {
this.reduceStopForBest = reduceStopForBest;
return this;
}

/**
* Optimizing specifically for iterators can yield correct data results. Default is False.
*
* @param iterator <code>Boolean.TRUE</code> ignore, Boolean.FALSE is not
* @return <code>Builder</code>
*/
public Builder withIterator(@NonNull Boolean iterator) {
this.iterator = iterator;
return this;
}

/**
* Verifies parameters and creates a new {@link QueryParam} instance.
*
Expand Down
14 changes: 0 additions & 14 deletions src/main/java/io/milvus/param/dml/SearchParam.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class SearchParam {
private final Integer groupSize;
private final Boolean strictGroupSize;
private final PlaceholderType plType;
private final boolean iterator;

private SearchParam(@NonNull Builder builder) {
this.databaseName = builder.databaseName;
Expand All @@ -83,7 +82,6 @@ private SearchParam(@NonNull Builder builder) {
this.groupSize = builder.groupSize;
this.strictGroupSize = builder.strictGroupSize;
this.plType = builder.plType;
this.iterator = builder.iterator;
}

public static Builder newBuilder() {
Expand Down Expand Up @@ -114,7 +112,6 @@ public static class Builder {
private String groupByFieldName;
private Integer groupSize = null;
private Boolean strictGroupSize = null;
private Boolean iterator = Boolean.FALSE;

// plType is used to distinct vector type
// for Float16Vector/BFloat16Vector and BinaryVector, user inputs ByteBuffer
Expand Down Expand Up @@ -406,17 +403,6 @@ public Builder withStrictGroupSize(@NonNull Boolean strictGroupSize) {
return this;
}

/**
* Optimizing specifically for iterators can yield correct data results. Default is False.
*
* @param iterator <code>Boolean.TRUE</code> ignore, Boolean.FALSE is not
* @return <code>Builder</code>
*/
public Builder withIterator(@NonNull Boolean iterator) {
this.iterator = iterator;
return this;
}

/**
* Verifies parameters and creates a new {@link SearchParam} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public class QueryIteratorReq {
private boolean ignoreGrowing = false;
@Builder.Default
private long batchSize = 1000L;
@Builder.Default
private boolean reduceStopForBest = false;
}
Loading