From 09ede628e5fb9b0ca6db9b8a727624b8a9eb1af1 Mon Sep 17 00:00:00 2001 From: vthiruveedula <78776871+vthiruveedula@users.noreply.github.com> Date: Mon, 7 Mar 2022 21:07:01 +0530 Subject: [PATCH] SDCORE-832: metadataflag control from NF client (#11) --- proto/client/gClient.go | 72 +++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 17 deletions(-) diff --git a/proto/client/gClient.go b/proto/client/gClient.go index fdf4fd6..dc19173 100644 --- a/proto/client/gClient.go +++ b/proto/client/gClient.go @@ -38,32 +38,63 @@ type Nssai struct { } type ConfigClient struct { - Client protos.ConfigServiceClient - Conn *grpc.ClientConn - Version string + Client protos.ConfigServiceClient + Conn *grpc.ClientConn + Version string + MetadataRequested bool +} + +type ConfClient interface { + // channel is created on which subscription is done. + // On Receiving Configuration from ConfigServer, this api publishes + // on created channel and returns the channel + PublishOnConfigChange(bool) chan *protos.NetworkSliceResponse + + //returns grpc connection object + GetConfigClientConn() *grpc.ClientConn + + //Client Subscribing channel to ConfigPod to receive configuration + subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) +} + +//This API is added to control metadata from NF Clients +func ConnectToConfigServer(host string) ConfClient { + confClient := CreateChannel(host, 10000) + if confClient == nil { + logger.GrpcLog.Errorln("create grpc channel to config pod failed") + return nil + } + return confClient +} + +func (confClient *ConfigClient) PublishOnConfigChange(mdataFlag bool) chan *protos.NetworkSliceResponse { + confClient.MetadataRequested = mdataFlag + commChan := make(chan *protos.NetworkSliceResponse) + go confClient.subscribeToConfigPod(commChan) + return commChan } // pass structr which has configChangeUpdate interface func ConfigWatcher() chan *protos.NetworkSliceResponse { //var confClient *gClient.ConfigClient //TODO: use port from configmap. - confClient, err := CreateChannel("webui:9876", 10000) - if err != nil { - logger.GrpcLog.Errorf("create grpc channel to config pod failed. : ", err) + confClient := CreateChannel("webui:9876", 10000) + if confClient == nil { + logger.GrpcLog.Errorf("create grpc channel to config pod failed") return nil } commChan := make(chan *protos.NetworkSliceResponse) - go subscribeToConfigPod(confClient, commChan) + go confClient.subscribeToConfigPod(commChan) return commChan } -func CreateChannel(host string, timeout uint32) (*ConfigClient, error) { +func CreateChannel(host string, timeout uint32) ConfClient { logger.GrpcLog.Infoln("create config client") // Second, check to see if we can reuse the gRPC connection for a new P4RT client - conn, err := GetConnection(host) + conn, err := newClientConnection(host) if err != nil { - logger.GrpcLog.Errorf("grpc connection failed") - return nil, err + logger.GrpcLog.Errorf("grpc connection failed %v", err) + return nil } client := &ConfigClient{ @@ -71,7 +102,7 @@ func CreateChannel(host string, timeout uint32) (*ConfigClient, error) { Conn: conn, } - return client, nil + return client } var kacp = keepalive.ClientParameters{ @@ -92,9 +123,10 @@ var retryPolicy = `{ "RetryableStatusCodes": [ "UNAVAILABLE" ] }}]}` -func GetConnection(host string) (conn *grpc.ClientConn, err error) { +func newClientConnection(host string) (conn *grpc.ClientConn, err error) { /* get connection */ logger.GrpcLog.Infoln("Dial grpc connection - ", host) + bd := 1 * time.Second mltpr := 1.0 jitter := 0.2 @@ -102,7 +134,9 @@ func GetConnection(host string) (conn *grpc.ClientConn, err error) { bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay} crt := grpc.ConnectParams{Backoff: bc} - conn, err = grpc.Dial(host, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)) + dialOptions := []grpc.DialOption{grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)} + conn, err = grpc.Dial(host, dialOptions...) + if err != nil { logger.GrpcLog.Errorln("grpc dial err: ", err) return nil, err @@ -111,7 +145,11 @@ func GetConnection(host string) (conn *grpc.ClientConn, err error) { return conn, err } -func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) { +func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn { + return confClient.Conn +} + +func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) { logger.GrpcLog.Infoln("subscribeToConfigPod ") myid := os.Getenv("HOSTNAME") var stream protos.ConfigService_NetworkSliceSubscribeClient @@ -121,7 +159,7 @@ func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.Networ var err error if status == connectivity.Ready { logger.GrpcLog.Infoln("connectivity ready ") - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: true} + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested} if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { logger.GrpcLog.Errorf("Failed to subscribe: %v", err) time.Sleep(time.Second * 5) @@ -175,7 +213,7 @@ func readConfigInLoop(confClient *ConfigClient, commChan chan *protos.NetworkSli case <-configReadTimeout.C: status := confClient.Conn.GetState() if status == connectivity.Ready { - rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: true} + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested} rsp, err := confClient.Client.GetNetworkSlice(context.Background(), rreq) if err != nil { logger.GrpcLog.Errorln("read Network Slice config from webconsole failed : ", err)