Skip to content

Commit

Permalink
Add flush/compact/getCompactionState interface for V2
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo committed Oct 25, 2024
1 parent 19230c7 commit 08ae769
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 33 deletions.
39 changes: 39 additions & 0 deletions src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,45 @@ public DescribeAliasResp describeAlias(DescribeAliasReq request) {
return retry(()->utilityService.describeAlias(this.getRpcStub(), request));
}

/**
* trigger a flush action in server side
*
* @param request flush request
*/
public void flush(FlushReq request) {
FlushResp response = retry(()->utilityService.flush(this.getRpcStub(), request));

// The BlockingStub.flush() api returns immediately after the datanode set all growing segments to be "sealed".
// The flush state becomes "Completed" after the datanode uploading them to S3 asynchronously.
// Here we wait the flush action to be "Completed".
MilvusServiceGrpc.MilvusServiceBlockingStub tempBlockingStub =
MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
if (request.getWaitFlushedTimeoutMs() > 0L) {
tempBlockingStub = tempBlockingStub.withDeadlineAfter(request.getWaitFlushedTimeoutMs(), TimeUnit.MILLISECONDS);
}
utilityService.waitFlush(tempBlockingStub, response.getCollectionSegmentIDs(), response.getCollectionFlushTs());
}

/**
* trigger an asynchronous compaction in server side
*
* @param request compact request
* @return CompactResp
*/
public CompactResp compact(CompactReq request) {
return retry(()->utilityService.compact(this.getRpcStub(), request));
}

/**
* get a compact task state by its ID
*
* @param request get compact state request
* @return GetCompactStateResp
*/
public GetCompactionStateResp getCompactionState(GetCompactionStateReq request) {
return retry(()->utilityService.getCompactionState(this.getRpcStub(), request));
}

/**
* Get server version
*
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/milvus/v2/common/CompactionState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.common;

import lombok.Getter;

@Getter
public enum CompactionState {
UndefiedState(0),
Executing(1),
Completed(2);

private final int code;
CompactionState(int code) {
this.code = code;
}
;
}
19 changes: 19 additions & 0 deletions src/main/java/io/milvus/v2/common/IndexBuildState.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.common;

public enum IndexBuildState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package io.milvus.v2.service.collection.response;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.ArrayList;
import java.util.List;

@Data
@SuperBuilder
public class ListCollectionsResp {
private List<String> collectionNames;
@Builder.Default
private List<String> collectionNames = new ArrayList<>();
}
99 changes: 86 additions & 13 deletions src/main/java/io/milvus/v2/service/utility/UtilityService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,97 @@

package io.milvus.v2.service.utility;

import io.milvus.grpc.FlushResponse;
import io.milvus.grpc.MilvusServiceGrpc;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.grpc.*;
import io.milvus.v2.common.CompactionState;
import io.milvus.v2.service.BaseService;
import io.milvus.v2.service.utility.request.*;
import io.milvus.v2.service.utility.response.DescribeAliasResp;
import io.milvus.v2.service.utility.response.ListAliasResp;
import io.milvus.v2.service.utility.response.*;

import java.util.*;

public class UtilityService extends BaseService {
public R<RpcStatus> flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushReq request) {
String title = String.format("Flush collection %s", request.getCollectionName());
io.milvus.grpc.FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
.addCollectionNames(request.getCollectionName())
public FlushResp flush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, FlushReq request) {
List<String> collectionNames = request.getCollectionNames();
String title = String.format("Flush collections %s", collectionNames);
if (collectionNames.isEmpty()) {
return null; // maybe do flushAll in future
}

FlushRequest flushRequest = io.milvus.grpc.FlushRequest.newBuilder()
.addAllCollectionNames(collectionNames)
.build();
FlushResponse response = blockingStub.flush(flushRequest);
rpcUtils.handleResponse(title, response.getStatus());

Map<String, io.milvus.grpc.LongArray> rpcCollSegIDs = response.getCollSegIDsMap();
Map<String, List<Long>> collectionSegmentIDs = new HashMap<>();
rpcCollSegIDs.forEach((key, value)->{
collectionSegmentIDs.put(key, value.getDataList());
});
Map<String, Long> collectionFlushTs = response.getCollFlushTsMap();
return FlushResp.builder()
.collectionSegmentIDs(collectionSegmentIDs)
.collectionFlushTs(collectionFlushTs)
.build();
}

// this method is internal use, not expose to user
public Void waitFlush(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
Map<String, List<Long>> collectionSegmentIDs,
Map<String, Long> collectionFlushTs) {
collectionSegmentIDs.forEach((collectionName, segmentIDs)->{
if (collectionFlushTs.containsKey(collectionName)) {
Long flushTs = collectionFlushTs.get(collectionName);
boolean flushed = false;
while (!flushed) {
GetFlushStateResponse flushResponse = blockingStub.getFlushState(GetFlushStateRequest.newBuilder()
.addAllSegmentIDs(segmentIDs)
.setFlushTs(flushTs)
.build());

flushed = flushResponse.getFlushed();
}
}
});

return null;
}

public CompactResp compact(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, CompactReq request) {
String title = String.format("Compact collection %s", request.getCollectionName());

DescribeCollectionResponse descResponse = blockingStub.describeCollection(DescribeCollectionRequest.newBuilder()
.setCollectionName(request.getCollectionName())
.build());
rpcUtils.handleResponse(title, descResponse.getStatus());

io.milvus.grpc.ManualCompactionRequest compactRequest = io.milvus.grpc.ManualCompactionRequest.newBuilder()
.setCollectionID(descResponse.getCollectionID())
.setMajorCompaction(request.getIsClustering())
.build();
io.milvus.grpc.ManualCompactionResponse response = blockingStub.manualCompaction(compactRequest);
rpcUtils.handleResponse(title, response.getStatus());

return CompactResp.builder()
.compactionID(response.getCompactionID())
.build();
}

public GetCompactionStateResp getCompactionState(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
GetCompactionStateReq request) {
String title = "Get compaction state";
io.milvus.grpc.GetCompactionStateRequest getRequest = io.milvus.grpc.GetCompactionStateRequest.newBuilder()
.setCompactionID(request.getCompactionID())
.build();
io.milvus.grpc.GetCompactionStateResponse response = blockingStub.getCompactionState(getRequest);
rpcUtils.handleResponse(title, response.getStatus());

return GetCompactionStateResp.builder()
.state(CompactionState.valueOf(response.getState().name()))
.executingPlanNo(response.getExecutingPlanNo())
.timeoutPlanNo(response.getTimeoutPlanNo())
.completedPlanNo(response.getCompletedPlanNo())
.build();
FlushResponse status = blockingStub.flush(flushRequest);
rpcUtils.handleResponse(title, status.getStatus());
return R.success(new RpcStatus(RpcStatus.SUCCESS_MSG));
}

public Void createAlias(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub, CreateAliasReq request) {
Expand Down
33 changes: 33 additions & 0 deletions src/main/java/io/milvus/v2/service/utility/request/CompactReq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.utility.request;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class CompactReq {
private String collectionName;

@Builder.Default
private Boolean isClustering = Boolean.FALSE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@

package io.milvus.v2.service.utility.request;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

import java.util.ArrayList;
import java.util.List;

@Data
@SuperBuilder
public class FlushReq {
private String collectionName;
@Builder.Default
private List<String> collectionNames = new ArrayList<>();

@Builder.Default
private Long waitFlushedTimeoutMs = 0L; // 0 - waiting util flush task is done
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.utility.request;

import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class GetCompactionStateReq {
private Long compactionID;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.utility.response;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
public class CompactResp {
@Builder.Default
private Long compactionID = 0L;
}
36 changes: 36 additions & 0 deletions src/main/java/io/milvus/v2/service/utility/response/FlushResp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.milvus.v2.service.utility.response;

import lombok.Builder;
import lombok.Data;
import lombok.experimental.SuperBuilder;
import org.apache.commons.collections.map.HashedMap;

import java.util.*;

@Data
@SuperBuilder
public class FlushResp {
@Builder.Default
Map<String, List<Long>> collectionSegmentIDs = new HashedMap();
@Builder.Default
Map<String, Long> collectionFlushTs = new HashedMap();
}
Loading

0 comments on commit 08ae769

Please sign in to comment.