diff --git a/examples/main/java/io/milvus/v1/ConsistencyLevelExample.java b/examples/main/java/io/milvus/v1/ConsistencyLevelExample.java new file mode 100644 index 000000000..36fcc1599 --- /dev/null +++ b/examples/main/java/io/milvus/v1/ConsistencyLevelExample.java @@ -0,0 +1,236 @@ +package io.milvus.v1; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.milvus.client.MilvusClient; +import io.milvus.client.MilvusServiceClient; +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.grpc.DataType; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.grpc.MutationResult; +import io.milvus.grpc.SearchResults; +import io.milvus.param.*; +import io.milvus.param.collection.*; +import io.milvus.param.dml.InsertParam; +import io.milvus.param.dml.SearchParam; +import io.milvus.param.index.CreateIndexParam; +import io.milvus.pool.MilvusClientV1Pool; +import io.milvus.pool.PoolConfig; +import io.milvus.response.DescCollResponseWrapper; +import io.milvus.response.SearchResultsWrapper; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class ConsistencyLevelExample { + private static final MilvusClient milvusClient; + + static { + ConnectParam connectParam = ConnectParam.newBuilder() + .withHost("localhost") + .withPort(19530) + .build(); + milvusClient = new MilvusServiceClient(connectParam); + } + + private static final String COLLECTION_NAME_PREFIX = "java_sdk_example_clevel_v1_"; + private static final Integer VECTOR_DIM = 512; + + private static String createCollection(ConsistencyLevelEnum level) { + String collectionName = COLLECTION_NAME_PREFIX + level.getName(); + + // Drop collection if exists + milvusClient.dropCollection(DropCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + + // Quickly create a collection with "id" field and "vector" field + List fieldsSchema = Arrays.asList( + FieldType.newBuilder() + .withName("id") + .withDataType(DataType.Int64) + .withPrimaryKey(true) + .withAutoID(false) + .build(), + FieldType.newBuilder() + .withName("vector") + .withDataType(DataType.FloatVector) + .withDimension(VECTOR_DIM) + .build() + ); + + // Create the collection with 3 fields + R response = milvusClient.createCollection(CreateCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withFieldTypes(fieldsSchema) + .withConsistencyLevel(level) + .build()); + CommonUtils.handleResponseStatus(response); + System.out.printf("Collection '%s' created\n", collectionName); + + response = milvusClient.createIndex(CreateIndexParam.newBuilder() + .withCollectionName(collectionName) + .withFieldName("vector") + .withIndexType(IndexType.FLAT) + .withMetricType(MetricType.L2) + .build()); + CommonUtils.handleResponseStatus(response); + + milvusClient.loadCollection(LoadCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + + return collectionName; + } + + private static void showCollectionLevel(String collectionName) { + R response = milvusClient.describeCollection(DescribeCollectionParam.newBuilder() + .withCollectionName(collectionName) + .build()); + CommonUtils.handleResponseStatus(response); + DescCollResponseWrapper wrapper = new DescCollResponseWrapper(response.getData()); + System.out.printf("Default consistency level: %s\n", wrapper.getConsistencyLevel().getName()); + } + + private static int insertData(String collectionName) { + Gson gson = new Gson(); + int rowCount = 1000; + for (int i = 0; i < rowCount; i++) { + JsonObject row = new JsonObject(); + row.addProperty("id", i); + row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(VECTOR_DIM))); + + R response = milvusClient.insert(InsertParam.newBuilder() + .withCollectionName(collectionName) + .withRows(Collections.singletonList(row)) + .build()); + CommonUtils.handleResponseStatus(response); + } + + System.out.printf("%d rows inserted\n", rowCount); + return rowCount; + } + + private static List search(String collectionName, int topK) { + R searchR = milvusClient.search(SearchParam.newBuilder() + .withCollectionName(collectionName) + .withVectorFieldName("vector") + .withFloatVectors(Collections.singletonList(CommonUtils.generateFloatVector(VECTOR_DIM))) + .withTopK(topK) + .withMetricType(MetricType.L2) + .build()); + CommonUtils.handleResponseStatus(searchR); + + SearchResultsWrapper resultsWrapper = new SearchResultsWrapper(searchR.getData().getResults()); + List scores = resultsWrapper.getIDScore(0); + return scores; + } + + private static void testStrongLevel() { + String collectionName = createCollection(ConsistencyLevelEnum.STRONG); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Strong level, all the entities are visible + List scores = search(collectionName, rowCount); + if (scores.size() != rowCount) { + throw new RuntimeException(String.format("All inserted entities should be visible with Strong" + + " consistency level, but only %d returned", scores.size())); + } + System.out.printf("Strong level is working fine, %d results returned\n", scores.size()); + } + + private static void testSessionLevel() throws ClassNotFoundException, NoSuchMethodException { + String collectionName = createCollection(ConsistencyLevelEnum.SESSION); + showCollectionLevel(collectionName); + + ConnectParam connectConfig = ConnectParam.newBuilder() + .withHost("localhost") + .withPort(19530) + .build(); + PoolConfig poolConfig = PoolConfig.builder() + .maxIdlePerKey(10) // max idle clients per key + .maxTotalPerKey(20) // max total(idle + active) clients per key + .maxTotal(100) // max total clients for all keys + .maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available + .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds + .build(); + MilvusClientV1Pool pool = new MilvusClientV1Pool(poolConfig, connectConfig); + + // The same process, different MilvusClient object, insert and search with Session level. + // The Session level ensure that the newly inserted data instantaneously become searchable. + Gson gson = new Gson(); + for (int i = 0; i < 100; i++) { + List vector = CommonUtils.generateFloatVector(VECTOR_DIM); + JsonObject row = new JsonObject(); + row.addProperty("id", i); + row.add("vector", gson.toJsonTree(vector)); + + // insert by a MilvusClient + String clientName1 = String.format("client_%d", i%10); + MilvusClient client1 = pool.getClient(clientName1); + client1.insert(InsertParam.newBuilder() + .withCollectionName(collectionName) + .withRows(Collections.singletonList(row)) + .build()); + pool.returnClient(clientName1, client1); // don't forget to return the client to pool + System.out.println("insert"); + + // search by another MilvusClient, use the just inserted vector to search + // the returned item is expected to be the just inserted item + String clientName2 = String.format("client_%d", i%10+1); + MilvusClient client2 = pool.getClient(clientName2); + R searchR = client2.search(SearchParam.newBuilder() + .withCollectionName(collectionName) + .withVectorFieldName("vector") + .withFloatVectors(Collections.singletonList(vector)) + .withTopK(1) + .withMetricType(MetricType.L2) + .build()); + pool.returnClient(clientName2, client2); // don't forget to return the client to pool + SearchResultsWrapper resultsWrapper = new SearchResultsWrapper(searchR.getData().getResults()); + List scores = resultsWrapper.getIDScore(0); + if (scores.size() != 1) { + throw new RuntimeException("Search result is empty"); + } + if (i != scores.get(0).getLongID()) { + throw new RuntimeException("The just inserted entity is not found"); + } + System.out.println("search"); + } + + System.out.println("Session level is working fine"); + } + + private static void testBoundedLevel() { + String collectionName = createCollection(ConsistencyLevelEnum.BOUNDED); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Bounded level, not all the entities are visible + List scores = search(collectionName, rowCount); + System.out.printf("Bounded level is working fine, %d results returned\n", scores.size()); + } + + private static void testEventuallyLevel() { + String collectionName = createCollection(ConsistencyLevelEnum.EVENTUALLY); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Bounded level, not all the entities are visible + List scores = search(collectionName, rowCount); + System.out.printf("Eventually level is working fine, %d results returned\n", scores.size()); + } + + public static void main(String[] args) throws Exception { + testStrongLevel(); + System.out.println("=============================================================="); + testSessionLevel(); + System.out.println("=============================================================="); + testBoundedLevel(); + System.out.println("=============================================================="); + testEventuallyLevel(); + } +} diff --git a/examples/main/java/io/milvus/v2/ConsistencyLevelExample.java b/examples/main/java/io/milvus/v2/ConsistencyLevelExample.java new file mode 100644 index 000000000..3f198186d --- /dev/null +++ b/examples/main/java/io/milvus/v2/ConsistencyLevelExample.java @@ -0,0 +1,192 @@ +package io.milvus.v2; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import io.milvus.pool.MilvusClientV2Pool; +import io.milvus.pool.PoolConfig; +import io.milvus.v1.CommonUtils; +import io.milvus.v2.client.ConnectConfig; +import io.milvus.v2.client.MilvusClientV2; +import io.milvus.v2.common.ConsistencyLevel; +import io.milvus.v2.service.collection.request.CreateCollectionReq; +import io.milvus.v2.service.collection.request.DescribeCollectionReq; +import io.milvus.v2.service.collection.request.DropCollectionReq; +import io.milvus.v2.service.collection.response.DescribeCollectionResp; +import io.milvus.v2.service.vector.request.InsertReq; +import io.milvus.v2.service.vector.request.SearchReq; +import io.milvus.v2.service.vector.request.data.FloatVec; +import io.milvus.v2.service.vector.response.SearchResp; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; + +public class ConsistencyLevelExample { + private static final MilvusClientV2 client; + + static { + ConnectConfig config = ConnectConfig.builder() + .uri("http://localhost:19530") + .build(); + client = new MilvusClientV2(config); + } + + private static final String COLLECTION_NAME_PREFIX = "java_sdk_example_clevel_v2_"; + private static final Integer VECTOR_DIM = 512; + + private static String createCollection(ConsistencyLevel level) { + String collectionName = COLLECTION_NAME_PREFIX + level.getName(); + + // Drop collection if exists + client.dropCollection(DropCollectionReq.builder() + .collectionName(collectionName) + .build()); + + // Quickly create a collection with "id" field and "vector" field + client.createCollection(CreateCollectionReq.builder() + .collectionName(collectionName) + .dimension(VECTOR_DIM) + .consistencyLevel(level) + .build()); + System.out.printf("Collection '%s' created\n", collectionName); + return collectionName; + } + + private static void showCollectionLevel(String collectionName) { + DescribeCollectionResp resp = client.describeCollection(DescribeCollectionReq.builder() + .collectionName(collectionName) + .build()); + System.out.printf("Default consistency level: %s\n", resp.getConsistencyLevel().getName()); + } + + private static int insertData(String collectionName) { + Gson gson = new Gson(); + int rowCount = 1000; + for (int i = 0; i < rowCount; i++) { + JsonObject row = new JsonObject(); + row.addProperty("id", i); + row.add("vector", gson.toJsonTree(CommonUtils.generateFloatVector(VECTOR_DIM))); + + client.insert(InsertReq.builder() + .collectionName(collectionName) + .data(Collections.singletonList(row)) + .build()); + } + + System.out.printf("%d rows inserted\n", rowCount); + return rowCount; + } + + private static List search(String collectionName, int topK) { + SearchResp searchR = client.search(SearchReq.builder() + .collectionName(collectionName) + .data(Collections.singletonList(new FloatVec(CommonUtils.generateFloatVector(VECTOR_DIM)))) + .topK(topK) + .build()); + List> searchResults = searchR.getSearchResults(); + return searchResults.get(0); + } + + private static void testStrongLevel() { + String collectionName = createCollection(ConsistencyLevel.STRONG); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Strong level, all the entities are visible + List results = search(collectionName, rowCount); + if (results.size() != rowCount) { + throw new RuntimeException(String.format("All inserted entities should be visible with Strong" + + " consistency level, but only %d returned", results.size())); + } + System.out.printf("Strong level is working fine, %d results returned\n", results.size()); + } + + private static void testSessionLevel() throws Exception { + String collectionName = createCollection(ConsistencyLevel.SESSION); + showCollectionLevel(collectionName); + + ConnectConfig connectConfig = ConnectConfig.builder() + .uri("http://localhost:19530") + .build(); + PoolConfig poolConfig = PoolConfig.builder() + .maxIdlePerKey(10) // max idle clients per key + .maxTotalPerKey(20) // max total(idle + active) clients per key + .maxTotal(100) // max total clients for all keys + .maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available + .minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds + .build(); + MilvusClientV2Pool pool = new MilvusClientV2Pool(poolConfig, connectConfig); + + // The same process, different MilvusClient object, insert and search with Session level. + // The Session level ensure that the newly inserted data instantaneously become searchable. + Gson gson = new Gson(); + for (int i = 0; i < 100; i++) { + List vector = CommonUtils.generateFloatVector(VECTOR_DIM); + JsonObject row = new JsonObject(); + row.addProperty("id", i); + row.add("vector", gson.toJsonTree(vector)); + + // insert by a MilvusClient + String clientName1 = String.format("client_%d", i%10); + MilvusClientV2 client1 = pool.getClient(clientName1); + client1.insert(InsertReq.builder() + .collectionName(collectionName) + .data(Collections.singletonList(row)) + .build()); + pool.returnClient(clientName1, client1); // don't forget to return the client to pool + System.out.println("insert"); + + // search by another MilvusClient, use the just inserted vector to search + // the returned item is expected to be the just inserted item + String clientName2 = String.format("client_%d", i%10+1); + MilvusClientV2 client2 = pool.getClient(clientName2); + SearchResp searchR = client2.search(SearchReq.builder() + .collectionName(collectionName) + .data(Collections.singletonList(new FloatVec(vector))) + .topK(1) + .build()); + pool.returnClient(clientName2, client2); // don't forget to return the client to pool + List> searchResults = searchR.getSearchResults(); + List results = searchResults.get(0); + if (results.size() != 1) { + throw new RuntimeException("Search result is empty"); + } + if (i != (Long)results.get(0).getId()) { + throw new RuntimeException("The just inserted entity is not found"); + } + System.out.println("search"); + } + + System.out.println("Session level is working fine"); + } + + private static void testBoundedLevel() { + String collectionName = createCollection(ConsistencyLevel.BOUNDED); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Bounded level, not all the entities are visible + List results = search(collectionName, rowCount); + System.out.printf("Bounded level is working fine, %d results returned\n", results.size()); + } + + private static void testEventuallyLevel() { + String collectionName = createCollection(ConsistencyLevel.EVENTUALLY); + showCollectionLevel(collectionName); + int rowCount = insertData(collectionName); + + // immediately search after insert, for Bounded level, not all the entities are visible + List results = search(collectionName, rowCount); + System.out.printf("Eventually level is working fine, %d results returned\n", results.size()); + } + + public static void main(String[] args) throws Exception { + testStrongLevel(); + System.out.println("=============================================================="); + testSessionLevel(); + System.out.println("=============================================================="); + testBoundedLevel(); + System.out.println("=============================================================="); + testEventuallyLevel(); + } +} diff --git a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java index 90c30fc28..4a5d0c6dc 100644 --- a/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java +++ b/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import io.grpc.StatusRuntimeException; +import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.common.utils.VectorUtils; import io.milvus.exception.*; @@ -1554,6 +1555,7 @@ public R delete(@NonNull DeleteParam requestParam) { MutationResult response = blockingStub().delete(builder.build()); handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1581,6 +1583,7 @@ public R insert(@NonNull InsertParam requestParam) { MutationResult response = blockingStub().insert(builderWraper.buildInsertRequest()); cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1616,6 +1619,7 @@ public void onSuccess(MutationResult result) { cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); + GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } @@ -1658,6 +1662,7 @@ public R upsert(UpsertParam requestParam) { MutationResult response = blockingStub().upsert(builderWraper.buildUpsertRequest()); cleanCacheIfFailed(response.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), response.getTimestamp()); return R.success(response); } catch (StatusRuntimeException e) { logError("{} RPC failed! Exception:{}", title, e); @@ -1692,6 +1697,7 @@ public void onSuccess(MutationResult result) { cleanCacheIfFailed(result.getStatus(), requestParam.getDatabaseName(), requestParam.getCollectionName()); if (result.getStatus().getErrorCode() == ErrorCode.Success) { logDebug("{} successfully!", title); + GTsDict.getInstance().updateCollectionTs(requestParam.getCollectionName(), result.getTimestamp()); } else { logError("{} failed:\n{}", title, result.getStatus().getReason()); } diff --git a/src/main/java/io/milvus/common/clientenum/ConsistencyLevelEnum.java b/src/main/java/io/milvus/common/clientenum/ConsistencyLevelEnum.java index 9581494d9..f9508f63f 100644 --- a/src/main/java/io/milvus/common/clientenum/ConsistencyLevelEnum.java +++ b/src/main/java/io/milvus/common/clientenum/ConsistencyLevelEnum.java @@ -24,8 +24,7 @@ public enum ConsistencyLevelEnum { STRONG("Strong", 0), - // Session level is not allowed here because no ORM is implemented -// SESSION("Session", 1), + SESSION("Session", 1), BOUNDED("Bounded", 2), EVENTUALLY("Eventually",3), ; diff --git a/src/main/java/io/milvus/common/utils/GTsDict.java b/src/main/java/io/milvus/common/utils/GTsDict.java new file mode 100644 index 000000000..de0f8adf8 --- /dev/null +++ b/src/main/java/io/milvus/common/utils/GTsDict.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.milvus.common.utils; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class GTsDict { + // GTsDict stores the last write timestamp for ConsistencyLevel.Session + // It is a Map, key is the name of a collection, value is the last write timestamp of the collection. + // It only takes effect when consistency level is Session. + // For each dml action, the GTsDict is updated, the last write timestamp is returned from server-side. + // When search/query/hybridSearch is called, and the consistency level is Session, the ts of the collection will + // be passed to construct a guarantee_ts to the server. + private final static GTsDict TS_DICT = new GTsDict(); + + private GTsDict(){} + + public static GTsDict getInstance() { + return TS_DICT; + } + + private ConcurrentMap tsDict = new ConcurrentHashMap<>(); + + public void updateCollectionTs(String collectionName, long ts) { + // If the collection name exists, use its value to compare to the input ts, + // only when the input ts is larger than the existing value, replace it with the input ts. + // If the collection name doesn't exist, directly set the input value. + tsDict.compute(collectionName, (key, value) -> (value == null) ? ts : ((ts > value) ? ts : value)); + } + + public Long getCollectionTs(String collectionName) { + return tsDict.get(collectionName); + } +} diff --git a/src/main/java/io/milvus/param/ParamUtils.java b/src/main/java/io/milvus/param/ParamUtils.java index a89d24b39..b9e03c556 100644 --- a/src/main/java/io/milvus/param/ParamUtils.java +++ b/src/main/java/io/milvus/param/ParamUtils.java @@ -23,6 +23,7 @@ import com.google.gson.reflect.TypeToken; import com.google.protobuf.ByteString; import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.exception.ParamException; import io.milvus.grpc.*; @@ -768,9 +769,8 @@ public static SearchRequest convertSearchParam(@NonNull SearchParam requestParam builder.setDsl(requestParam.getExpr()); } - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), - requestParam.getGuaranteeTimestamp(), requestParam.getGracefulTime()); - builder.setTravelTimestamp(requestParam.getTravelTimestamp()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + builder.setTravelTimestamp(requestParam.getTravelTimestamp()); // deprecated builder.setGuaranteeTimestamp(guaranteeTimestamp); // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true @@ -865,6 +865,9 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch requestParam.getOutFields().forEach(builder::addOutputFields); } + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); + builder.setGuaranteeTimestamp(guaranteeTimestamp); + if (requestParam.getConsistencyLevel() == null) { builder.setUseDefaultConsistency(true); } else { @@ -875,8 +878,7 @@ public static HybridSearchRequest convertHybridSearchParam(@NonNull HybridSearch } public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { - long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), - requestParam.getGuaranteeTimestamp(), requestParam.getGracefulTime()); + long guaranteeTimestamp = getGuaranteeTimestamp(requestParam.getConsistencyLevel(), requestParam.getCollectionName()); QueryRequest.Builder builder = QueryRequest.newBuilder() .setCollectionName(requestParam.getCollectionName()) .addAllPartitionNames(requestParam.getPartitionNames()) @@ -935,23 +937,22 @@ public static QueryRequest convertQueryParam(@NonNull QueryParam requestParam) { return builder.build(); } - private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, - long guaranteeTimestamp, Long gracefulTime){ + private static long getGuaranteeTimestamp(ConsistencyLevelEnum consistencyLevel, String collectionName){ if(consistencyLevel == null){ - return 1L; + Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: - guaranteeTimestamp = 0L; - break; + return 0L; + case SESSION: + Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + return (ts == null) ? 1L : ts; case BOUNDED: - guaranteeTimestamp = (new Date()).getTime() - gracefulTime; - break; - case EVENTUALLY: - guaranteeTimestamp = 1L; - break; + return 2L; // let server side to determine the bounded time + default: + return 1L; // EVENTUALLY and others } - return guaranteeTimestamp; } public static boolean isVectorDataType(DataType dataType) { diff --git a/src/main/java/io/milvus/param/dml/QueryIteratorParam.java b/src/main/java/io/milvus/param/dml/QueryIteratorParam.java index 839618fe7..16b189bc5 100644 --- a/src/main/java/io/milvus/param/dml/QueryIteratorParam.java +++ b/src/main/java/io/milvus/param/dml/QueryIteratorParam.java @@ -46,9 +46,9 @@ public class QueryIteratorParam { private final List partitionNames; private final List outFields; private final String expr; - private final long travelTimestamp; - private final long guaranteeTimestamp; - private final long gracefulTime; + private final long travelTimestamp; // deprecated + private final long guaranteeTimestamp; // deprecated + private final long gracefulTime; // deprecated private final ConsistencyLevelEnum consistencyLevel; private final long offset; private final long limit; diff --git a/src/main/java/io/milvus/param/dml/QueryParam.java b/src/main/java/io/milvus/param/dml/QueryParam.java index f0a87fb02..a49f78e76 100644 --- a/src/main/java/io/milvus/param/dml/QueryParam.java +++ b/src/main/java/io/milvus/param/dml/QueryParam.java @@ -82,9 +82,9 @@ public static class Builder { private final List partitionNames = Lists.newArrayList(); private final List outFields = new ArrayList<>(); private String expr = ""; - private Long travelTimestamp = 0L; - private Long gracefulTime = 5000L; - private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; + private Long travelTimestamp = 0L; // deprecated + private Long gracefulTime = 5000L; // deprecated + private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; // deprecated private ConsistencyLevelEnum consistencyLevel = null; private Long offset = 0L; private Long limit = 0L; diff --git a/src/main/java/io/milvus/param/dml/SearchIteratorParam.java b/src/main/java/io/milvus/param/dml/SearchIteratorParam.java index 7483e2ab5..580da1b72 100644 --- a/src/main/java/io/milvus/param/dml/SearchIteratorParam.java +++ b/src/main/java/io/milvus/param/dml/SearchIteratorParam.java @@ -55,9 +55,9 @@ public class SearchIteratorParam { private final Long NQ; private final int roundDecimal; private final String params; - private final long travelTimestamp; - private final long guaranteeTimestamp; - private final Long gracefulTime; + private final long travelTimestamp; // deprecated + private final long guaranteeTimestamp; // deprecated + private final Long gracefulTime; // deprecated private final ConsistencyLevelEnum consistencyLevel; private final boolean ignoreGrowing; private final String groupByFieldName; diff --git a/src/main/java/io/milvus/param/dml/SearchParam.java b/src/main/java/io/milvus/param/dml/SearchParam.java index 59644434e..973459947 100644 --- a/src/main/java/io/milvus/param/dml/SearchParam.java +++ b/src/main/java/io/milvus/param/dml/SearchParam.java @@ -102,9 +102,9 @@ public static class Builder { private Long NQ; private Integer roundDecimal = -1; private String params = "{}"; - private Long travelTimestamp = 0L; - private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; - private Long gracefulTime = 5000L; + private Long travelTimestamp = 0L; // deprecated + private Long guaranteeTimestamp = Constant.GUARANTEE_EVENTUALLY_TS; // deprecated + private Long gracefulTime = 5000L; // deprecated private ConsistencyLevelEnum consistencyLevel = null; private Boolean ignoreGrowing = Boolean.FALSE; private String groupByFieldName; diff --git a/src/main/java/io/milvus/v2/common/ConsistencyLevel.java b/src/main/java/io/milvus/v2/common/ConsistencyLevel.java index a189a0c4a..7de5ef49c 100644 --- a/src/main/java/io/milvus/v2/common/ConsistencyLevel.java +++ b/src/main/java/io/milvus/v2/common/ConsistencyLevel.java @@ -23,6 +23,7 @@ @Getter public enum ConsistencyLevel{ STRONG("Strong", 0), + SESSION("Session", 1), BOUNDED("Bounded", 2), EVENTUALLY("Eventually",3), ; diff --git a/src/main/java/io/milvus/v2/service/vector/VectorService.java b/src/main/java/io/milvus/v2/service/vector/VectorService.java index df156e6cb..0e6cdd139 100644 --- a/src/main/java/io/milvus/v2/service/vector/VectorService.java +++ b/src/main/java/io/milvus/v2/service/vector/VectorService.java @@ -19,6 +19,7 @@ package io.milvus.v2.service.vector; +import io.milvus.common.utils.GTsDict; import io.milvus.exception.ParamException; import io.milvus.grpc.*; import io.milvus.orm.iterator.*; @@ -108,6 +109,7 @@ public InsertResp insert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu MutationResult response = blockingStub.insert(requestBuilder.convertGrpcInsertRequest(request, new DescCollResponseWrapper(descResp))); cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); return InsertResp.builder() .InsertCnt(response.getInsertCnt()) .build(); @@ -122,6 +124,7 @@ public UpsertResp upsert(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu MutationResult response = blockingStub.upsert(requestBuilder.convertGrpcUpsertRequest(request, new DescCollResponseWrapper(descResp))); cleanCacheIfFailed(response.getStatus(), "", request.getCollectionName()); rpcUtils.handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); return UpsertResp.builder() .upsertCnt(response.getInsertCnt()) .build(); @@ -216,6 +219,7 @@ public DeleteResp delete(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStu .build(); MutationResult response = blockingStub.delete(deleteRequest); rpcUtils.handleResponse(title, response.getStatus()); + GTsDict.getInstance().updateCollectionTs(request.getCollectionName(), response.getTimestamp()); return DeleteResp.builder() .deleteCnt(response.getDeleteCnt()) .build(); diff --git a/src/main/java/io/milvus/v2/service/vector/request/HybridSearchReq.java b/src/main/java/io/milvus/v2/service/vector/request/HybridSearchReq.java index b8b9b596b..cf3b35f24 100644 --- a/src/main/java/io/milvus/v2/service/vector/request/HybridSearchReq.java +++ b/src/main/java/io/milvus/v2/service/vector/request/HybridSearchReq.java @@ -40,5 +40,6 @@ public class HybridSearchReq private List outFields; @Builder.Default private int roundDecimal = -1; - private ConsistencyLevel consistencyLevel; + @Builder.Default + private ConsistencyLevel consistencyLevel = null; } diff --git a/src/main/java/io/milvus/v2/service/vector/request/QueryIteratorReq.java b/src/main/java/io/milvus/v2/service/vector/request/QueryIteratorReq.java index 0d1cc8c98..7f0751231 100644 --- a/src/main/java/io/milvus/v2/service/vector/request/QueryIteratorReq.java +++ b/src/main/java/io/milvus/v2/service/vector/request/QueryIteratorReq.java @@ -20,7 +20,7 @@ public class QueryIteratorReq { @Builder.Default private String expr = ""; @Builder.Default - private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED; + private ConsistencyLevel consistencyLevel = null; @Builder.Default private long offset = 0; @Builder.Default diff --git a/src/main/java/io/milvus/v2/service/vector/request/QueryReq.java b/src/main/java/io/milvus/v2/service/vector/request/QueryReq.java index eaf621516..ae1aeb988 100644 --- a/src/main/java/io/milvus/v2/service/vector/request/QueryReq.java +++ b/src/main/java/io/milvus/v2/service/vector/request/QueryReq.java @@ -39,7 +39,7 @@ public class QueryReq { private List ids; private String filter; @Builder.Default - private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED; + private ConsistencyLevel consistencyLevel = null; private long offset; private long limit; } diff --git a/src/main/java/io/milvus/v2/service/vector/request/SearchIteratorReq.java b/src/main/java/io/milvus/v2/service/vector/request/SearchIteratorReq.java index 7fdae8628..28b0bbde7 100644 --- a/src/main/java/io/milvus/v2/service/vector/request/SearchIteratorReq.java +++ b/src/main/java/io/milvus/v2/service/vector/request/SearchIteratorReq.java @@ -33,7 +33,7 @@ public class SearchIteratorReq { @Builder.Default private String params = "{}"; @Builder.Default - private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED; + private ConsistencyLevel consistencyLevel = null; @Builder.Default private boolean ignoreGrowing = false; @Builder.Default diff --git a/src/main/java/io/milvus/v2/service/vector/request/SearchReq.java b/src/main/java/io/milvus/v2/service/vector/request/SearchReq.java index 2ca898bf2..339adf8a2 100644 --- a/src/main/java/io/milvus/v2/service/vector/request/SearchReq.java +++ b/src/main/java/io/milvus/v2/service/vector/request/SearchReq.java @@ -50,11 +50,11 @@ public class SearchReq { private int roundDecimal = -1; @Builder.Default private Map searchParams = new HashMap<>(); - private long guaranteeTimestamp; + private long guaranteeTimestamp; // deprecated @Builder.Default - private Long gracefulTime = 5000L; + private Long gracefulTime = 5000L; // deprecated @Builder.Default - private ConsistencyLevel consistencyLevel = ConsistencyLevel.BOUNDED; + private ConsistencyLevel consistencyLevel = null; private boolean ignoreGrowing; private String groupByFieldName; } diff --git a/src/main/java/io/milvus/v2/utils/VectorUtils.java b/src/main/java/io/milvus/v2/utils/VectorUtils.java index 59bfa5b83..0f0e86ef1 100644 --- a/src/main/java/io/milvus/v2/utils/VectorUtils.java +++ b/src/main/java/io/milvus/v2/utils/VectorUtils.java @@ -20,6 +20,7 @@ package io.milvus.v2.utils; import com.google.protobuf.ByteString; +import io.milvus.common.utils.GTsDict; import io.milvus.common.utils.JsonUtils; import io.milvus.v2.common.ConsistencyLevel; import io.milvus.exception.ParamException; @@ -79,23 +80,22 @@ public QueryRequest ConvertToGrpcQueryRequest(QueryReq request){ } - private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, - long guaranteeTimestamp, Long gracefulTime){ + private static long getGuaranteeTimestamp(ConsistencyLevel consistencyLevel, String collectionName){ if(consistencyLevel == null){ - return 1L; + Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + return (ts == null) ? 1L : ts; } switch (consistencyLevel){ case STRONG: - guaranteeTimestamp = 0L; - break; + return 0L; + case SESSION: + Long ts = GTsDict.getInstance().getCollectionTs(collectionName); + return (ts == null) ? 1L : ts; case BOUNDED: - guaranteeTimestamp = (new Date()).getTime() - gracefulTime; - break; - case EVENTUALLY: - guaranteeTimestamp = 1L; - break; + return 2L; // let server side to determine the bounded time + default: + return 1L; // EVENTUALLY and others } - return guaranteeTimestamp; } public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { @@ -182,9 +182,7 @@ public SearchRequest ConvertToGrpcSearchRequest(SearchReq request) { builder.setDsl(request.getFilter()); } - long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), - request.getGuaranteeTimestamp(), request.getGracefulTime()); - //builder.setTravelTimestamp(request.getTravelTimestamp()); + long guaranteeTimestamp = getGuaranteeTimestamp(request.getConsistencyLevel(), request.getCollectionName()); builder.setGuaranteeTimestamp(guaranteeTimestamp); // a new parameter from v2.2.9, if user didn't specify consistency level, set this parameter to true diff --git a/src/test/java/io/milvus/client/MilvusServiceClientTest.java b/src/test/java/io/milvus/client/MilvusServiceClientTest.java index 908dd5005..995443505 100644 --- a/src/test/java/io/milvus/client/MilvusServiceClientTest.java +++ b/src/test/java/io/milvus/client/MilvusServiceClientTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.common.utils.GTsDict; import io.milvus.exception.IllegalResponseException; import io.milvus.exception.ParamException; import io.milvus.grpc.*; @@ -40,6 +41,7 @@ import io.milvus.response.*; import io.milvus.server.MockMilvusServer; import io.milvus.server.MockMilvusServerImpl; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.lang.reflect.InvocationTargetException; @@ -3116,4 +3118,24 @@ void testGetBulkInsertStateWrapper() { assertFalse(wrapper.toString().isEmpty()); } + + @Test + void testGTsDict() { + GTsDict dict = GTsDict.getInstance(); + dict.updateCollectionTs("aaa", 0L); + dict.updateCollectionTs("bbb", 999L); + dict.updateCollectionTs("ccc", -10L); + Assertions.assertEquals(0L, dict.getCollectionTs("aaa")); + Assertions.assertEquals(999L, dict.getCollectionTs("bbb")); + Assertions.assertEquals(-10L, dict.getCollectionTs("ccc")); + + dict.updateCollectionTs("aaa", 20L); + Assertions.assertEquals(20L, dict.getCollectionTs("aaa")); + dict.updateCollectionTs("bbb", 200L); + Assertions.assertEquals(999L, dict.getCollectionTs("bbb")); + dict.updateCollectionTs("ccc", -50L); + Assertions.assertEquals(-10L, dict.getCollectionTs("ccc")); + dict.updateCollectionTs("ccc", 50L); + Assertions.assertEquals(50L, dict.getCollectionTs("ccc")); + } } \ No newline at end of file