From 846043639f457e831a3a4b10cecab89ce4277650 Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Mon, 15 Jul 2024 17:27:20 +0800 Subject: [PATCH 1/2] test --- core/dbclient/milvus2x.go | 32 +++++++++++++++++++++++-------- storage/milvus2x/milvus2_3_ver.go | 32 +++++++++++++++++++++++-------- 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/core/dbclient/milvus2x.go b/core/dbclient/milvus2x.go index 7b5ef5c..680d68f 100644 --- a/core/dbclient/milvus2x.go +++ b/core/dbclient/milvus2x.go @@ -10,6 +10,7 @@ import ( "github.com/zilliztech/milvus-migration/internal/log" "github.com/zilliztech/milvus-migration/storage/milvus2x" "go.uber.org/zap" + "google.golang.org/grpc" "strconv" "time" ) @@ -37,15 +38,30 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - if cfg.UserName == "" { - log.Info("[Milvus2x] find username is empty, will use NewDefaultGrpcClient() to new client") - milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) - } else { - log.Info("[Milvus2x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") - milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) - } + //if cfg.UserName == "" { + // log.Info("[Milvus2x] find username is empty, will use NewDefaultGrpcClient() to new client") + // milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) + //} else { + // log.Info("[Milvus2x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") + // milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) + //} + //if err != nil { + // log.Error("[Milvus2x] new milvus client error", zap.Error(err)) + // return nil, err + //} + + milvus, err = client.NewClient(ctx, client.Config{ + Address: cfg.Endpoint, + Username: cfg.UserName, + Password: cfg.Password, + DialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(67108864), + grpc.MaxCallSendMsgSize(268435456), + ), + }, + }) if err != nil { - log.Error("[Milvus2x] new milvus client error", zap.Error(err)) return nil, err } diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index 0ed2329..43f834a 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -9,6 +9,7 @@ import ( "github.com/zilliztech/milvus-migration/core/type/milvus2xtype" "github.com/zilliztech/milvus-migration/internal/log" "go.uber.org/zap" + "google.golang.org/grpc" "io" "strconv" "time" @@ -117,15 +118,30 @@ func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, e var err error ctx := context.Background() - if cfg.UserName == "" { - log.Info("[Milvus23x] find username is empty, will use NewDefaultGrpcClient() to new client") - milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) - } else { - log.Info("[Milvus23x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") - milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) - } + //if cfg.UserName == "" { + // log.Info("[Milvus23x] find username is empty, will use NewDefaultGrpcClient() to new client") + // milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) + //} else { + // log.Info("[Milvus23x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") + // milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) + //} + //if err != nil { + // log.Error("[Milvus23x] new milvus client error", zap.Error(err)) + // return nil, err + //} + + milvus, err = client.NewClient(ctx, client.Config{ + Address: cfg.Endpoint, + Username: cfg.UserName, + Password: cfg.Password, + DialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(67108864), + grpc.MaxCallSendMsgSize(268435456), + ), + }, + }) if err != nil { - log.Error("[Milvus23x] new milvus client error", zap.Error(err)) return nil, err } From 80da0a28976f8c0207d3da50495046499b421407 Mon Sep 17 00:00:00 2001 From: "wenhui.zhang" Date: Mon, 15 Jul 2024 18:05:58 +0800 Subject: [PATCH 2/2] support custom grpcMsgSize param --- core/config/config.go | 3 ++ core/config/resolve.go | 16 +++++---- core/dbclient/milvus2x.go | 52 ++++++++++++++------------- storage/milvus2x/milvus2_3_ver.go | 58 +++++++++++++++++-------------- 4 files changed, 73 insertions(+), 56 deletions(-) diff --git a/core/config/config.go b/core/config/config.go index e8b77da..064540a 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -75,6 +75,9 @@ type Milvus2xConfig struct { UserName string Password string + GrpcMaxRecvMsgSize int + GrpcMaxSendMsgSize int + Version string //internal param hashCache atomic.Uint32 } diff --git a/core/config/resolve.go b/core/config/resolve.go index 662bc35..5519bd0 100644 --- a/core/config/resolve.go +++ b/core/config/resolve.go @@ -216,9 +216,11 @@ func resolveRemoteConfig(prefix string, v *viper.Viper) *RemoteConfig { func resolveTargetMilvus2xConfig(v *viper.Viper) *Milvus2xConfig { return &Milvus2xConfig{ - Endpoint: v.GetString("target.milvus2x.endpoint"), - UserName: v.GetString("target.milvus2x.username"), - Password: v.GetString("target.milvus2x.password"), + Endpoint: v.GetString("target.milvus2x.endpoint"), + UserName: v.GetString("target.milvus2x.username"), + Password: v.GetString("target.milvus2x.password"), + GrpcMaxRecvMsgSize: v.GetInt("target.milvus2x.grpc.maxCallRecvMsgSize"), + GrpcMaxSendMsgSize: v.GetInt("target.milvus2x.grpc.maxCallSendMsgSize"), } } @@ -351,8 +353,10 @@ func resolveMilvus2xDumpWorkConfig(v *viper.Viper, workMode common.DumpMode) (*D func resolveSourceMilvus2xConfig(v *viper.Viper) *Milvus2xConfig { return &Milvus2xConfig{ - Endpoint: v.GetString("source.milvus2x.endpoint"), - UserName: v.GetString("source.milvus2x.username"), - Password: v.GetString("source.milvus2x.password"), + Endpoint: v.GetString("source.milvus2x.endpoint"), + UserName: v.GetString("source.milvus2x.username"), + Password: v.GetString("source.milvus2x.password"), + GrpcMaxRecvMsgSize: v.GetInt("source.milvus2x.grpc.maxCallRecvMsgSize"), + GrpcMaxSendMsgSize: v.GetInt("source.milvus2x.grpc.maxCallSendMsgSize"), } } diff --git a/core/dbclient/milvus2x.go b/core/dbclient/milvus2x.go index 680d68f..cec02de 100644 --- a/core/dbclient/milvus2x.go +++ b/core/dbclient/milvus2x.go @@ -38,36 +38,40 @@ func NewMilvus2xClient(cfg *config.Milvus2xConfig) (*Milvus2x, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - //if cfg.UserName == "" { - // log.Info("[Milvus2x] find username is empty, will use NewDefaultGrpcClient() to new client") - // milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) - //} else { - // log.Info("[Milvus2x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") - // milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) - //} - //if err != nil { - // log.Error("[Milvus2x] new milvus client error", zap.Error(err)) - // return nil, err - //} - - milvus, err = client.NewClient(ctx, client.Config{ - Address: cfg.Endpoint, - Username: cfg.UserName, - Password: cfg.Password, - DialOptions: []grpc.DialOption{ - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(67108864), - grpc.MaxCallSendMsgSize(268435456), - ), - }, - }) + if cfg.GrpcMaxRecvMsgSize <= 0 { + if cfg.UserName == "" { + log.Info("[Milvus2x] find username is empty, will use NewDefaultGrpcClient() to new client") + milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) + } else { + log.Info("[Milvus2x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") + milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) + } + } else { + config := client.Config{ + Address: cfg.Endpoint, + DialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(cfg.GrpcMaxRecvMsgSize), + grpc.MaxCallSendMsgSize(cfg.GrpcMaxSendMsgSize), + ), + }, + } + if cfg.UserName != "" { + config.Username = cfg.UserName + config.Password = cfg.Password + } + milvus, err = client.NewClient(ctx, config) + } if err != nil { + log.Error("[Milvus2x] new milvus client error", zap.Error(err)) return nil, err } log.Info("[Milvus2x] begin to test connect", zap.String("endpoint", cfg.Endpoint), - zap.String("username", cfg.UserName)) + zap.String("username", cfg.UserName), + zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), + zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) _, err = milvus.HasCollection(ctx, "test") if err != nil { return nil, err diff --git a/storage/milvus2x/milvus2_3_ver.go b/storage/milvus2x/milvus2_3_ver.go index 43f834a..9d67035 100644 --- a/storage/milvus2x/milvus2_3_ver.go +++ b/storage/milvus2x/milvus2_3_ver.go @@ -116,38 +116,44 @@ func _createMilvus23VerClient(cfg *config.Milvus2xConfig) (*Milvus23VerClient, e var milvus client.Client var err error - ctx := context.Background() - - //if cfg.UserName == "" { - // log.Info("[Milvus23x] find username is empty, will use NewDefaultGrpcClient() to new client") - // milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) - //} else { - // log.Info("[Milvus23x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") - // milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) - //} - //if err != nil { - // log.Error("[Milvus23x] new milvus client error", zap.Error(err)) - // return nil, err - //} - - milvus, err = client.NewClient(ctx, client.Config{ - Address: cfg.Endpoint, - Username: cfg.UserName, - Password: cfg.Password, - DialOptions: []grpc.DialOption{ - grpc.WithDefaultCallOptions( - grpc.MaxCallRecvMsgSize(67108864), - grpc.MaxCallSendMsgSize(268435456), - ), - }, - }) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if cfg.GrpcMaxRecvMsgSize <= 0 { + if cfg.UserName == "" { + log.Info("[Milvus23x] find username is empty, will use NewDefaultGrpcClient() to new client") + milvus, err = client.NewDefaultGrpcClient(ctx, cfg.Endpoint) + } else { + log.Info("[Milvus23x] find username not empty, will use NewDefaultGrpcClientWithURI() to new client") + milvus, err = client.NewDefaultGrpcClientWithURI(ctx, cfg.Endpoint, cfg.UserName, cfg.Password) + } + } else { + config := client.Config{ + Address: cfg.Endpoint, + DialOptions: []grpc.DialOption{ + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(cfg.GrpcMaxRecvMsgSize), + grpc.MaxCallSendMsgSize(cfg.GrpcMaxSendMsgSize), + ), + }, + } + if cfg.UserName != "" { + config.Username = cfg.UserName + config.Password = cfg.Password + } + milvus, err = client.NewClient(ctx, config) + } if err != nil { + log.Error("[Milvus23x] new milvus client error", zap.Error(err)) return nil, err } log.Info("[Milvus23x] begin to test connect", zap.String("endpoint", cfg.Endpoint), - zap.String("username", cfg.UserName)) + zap.String("username", cfg.UserName), + zap.Int("GrpcMaxCallRecvMsgSize", cfg.GrpcMaxRecvMsgSize), + zap.Int("GrpcMaxCallSendMsgSize", cfg.GrpcMaxSendMsgSize)) + _, err = milvus.HasCollection(ctx, "test") if err != nil { return nil, err