Skip to content

Commit

Permalink
SDCORE-832: metadataflag control from NF client (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
vthiruveedula authored Mar 7, 2022
1 parent 264ee1e commit 09ede62
Showing 1 changed file with 55 additions and 17 deletions.
72 changes: 55 additions & 17 deletions proto/client/gClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,40 +38,71 @@ type Nssai struct {
}

type ConfigClient struct {
Client protos.ConfigServiceClient
Conn *grpc.ClientConn
Version string
Client protos.ConfigServiceClient
Conn *grpc.ClientConn
Version string
MetadataRequested bool
}

type ConfClient interface {
// channel is created on which subscription is done.
// On Receiving Configuration from ConfigServer, this api publishes
// on created channel and returns the channel
PublishOnConfigChange(bool) chan *protos.NetworkSliceResponse

//returns grpc connection object
GetConfigClientConn() *grpc.ClientConn

//Client Subscribing channel to ConfigPod to receive configuration
subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse)
}

//This API is added to control metadata from NF Clients
func ConnectToConfigServer(host string) ConfClient {
confClient := CreateChannel(host, 10000)
if confClient == nil {
logger.GrpcLog.Errorln("create grpc channel to config pod failed")
return nil
}
return confClient
}

func (confClient *ConfigClient) PublishOnConfigChange(mdataFlag bool) chan *protos.NetworkSliceResponse {
confClient.MetadataRequested = mdataFlag
commChan := make(chan *protos.NetworkSliceResponse)
go confClient.subscribeToConfigPod(commChan)
return commChan
}

// pass structr which has configChangeUpdate interface
func ConfigWatcher() chan *protos.NetworkSliceResponse {
//var confClient *gClient.ConfigClient
//TODO: use port from configmap.
confClient, err := CreateChannel("webui:9876", 10000)
if err != nil {
logger.GrpcLog.Errorf("create grpc channel to config pod failed. : ", err)
confClient := CreateChannel("webui:9876", 10000)
if confClient == nil {
logger.GrpcLog.Errorf("create grpc channel to config pod failed")
return nil
}
commChan := make(chan *protos.NetworkSliceResponse)
go subscribeToConfigPod(confClient, commChan)
go confClient.subscribeToConfigPod(commChan)
return commChan
}

func CreateChannel(host string, timeout uint32) (*ConfigClient, error) {
func CreateChannel(host string, timeout uint32) ConfClient {
logger.GrpcLog.Infoln("create config client")
// Second, check to see if we can reuse the gRPC connection for a new P4RT client
conn, err := GetConnection(host)
conn, err := newClientConnection(host)
if err != nil {
logger.GrpcLog.Errorf("grpc connection failed")
return nil, err
logger.GrpcLog.Errorf("grpc connection failed %v", err)
return nil
}

client := &ConfigClient{
Client: protos.NewConfigServiceClient(conn),
Conn: conn,
}

return client, nil
return client
}

var kacp = keepalive.ClientParameters{
Expand All @@ -92,17 +123,20 @@ var retryPolicy = `{
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}}]}`

func GetConnection(host string) (conn *grpc.ClientConn, err error) {
func newClientConnection(host string) (conn *grpc.ClientConn, err error) {
/* get connection */
logger.GrpcLog.Infoln("Dial grpc connection - ", host)

bd := 1 * time.Second
mltpr := 1.0
jitter := 0.2
MaxDelay := 5 * time.Second
bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter, MaxDelay: MaxDelay}

crt := grpc.ConnectParams{Backoff: bc}
conn, err = grpc.Dial(host, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt))
dialOptions := []grpc.DialOption{grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp), grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithConnectParams(crt)}
conn, err = grpc.Dial(host, dialOptions...)

if err != nil {
logger.GrpcLog.Errorln("grpc dial err: ", err)
return nil, err
Expand All @@ -111,7 +145,11 @@ func GetConnection(host string) (conn *grpc.ClientConn, err error) {
return conn, err
}

func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) {
func (confClient *ConfigClient) GetConfigClientConn() *grpc.ClientConn {
return confClient.Conn
}

func (confClient *ConfigClient) subscribeToConfigPod(commChan chan *protos.NetworkSliceResponse) {
logger.GrpcLog.Infoln("subscribeToConfigPod ")
myid := os.Getenv("HOSTNAME")
var stream protos.ConfigService_NetworkSliceSubscribeClient
Expand All @@ -121,7 +159,7 @@ func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.Networ
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready ")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: true}
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("Failed to subscribe: %v", err)
time.Sleep(time.Second * 5)
Expand Down Expand Up @@ -175,7 +213,7 @@ func readConfigInLoop(confClient *ConfigClient, commChan chan *protos.NetworkSli
case <-configReadTimeout.C:
status := confClient.Conn.GetState()
if status == connectivity.Ready {
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: true}
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid, MetadataRequested: confClient.MetadataRequested}
rsp, err := confClient.Client.GetNetworkSlice(context.Background(), rreq)
if err != nil {
logger.GrpcLog.Errorln("read Network Slice config from webconsole failed : ", err)
Expand Down

0 comments on commit 09ede62

Please sign in to comment.