diff --git a/proto/client/gClient.go b/proto/client/gClient.go index f0374a1..b9c01d6 100644 --- a/proto/client/gClient.go +++ b/proto/client/gClient.go @@ -51,7 +51,7 @@ func ConfigWatcher() chan *protos.NetworkSliceResponse { return nil } commChan := make(chan *protos.NetworkSliceResponse) - go readConfigInLoop(confClient, commChan) + go subscribeToConfigPod(confClient, commChan) return commChan } @@ -102,6 +102,62 @@ func GetConnection(host string) (conn *grpc.ClientConn, err error) { return conn, err } +func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) { + logger.GrpcLog.Infoln("subscribeToConfigPod ") + myid := os.Getenv("HOSTNAME") + var stream protos.ConfigService_NetworkSliceSubscribeClient + for { + if stream == nil { + status := confClient.Conn.GetState() + var err error + if status == connectivity.Ready { + logger.GrpcLog.Infoln("connectivity ready ") + rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid} + if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil { + logger.GrpcLog.Errorf("Failed to subscribe: %v", err) + time.Sleep(time.Second * 5) + // Retry on failure + continue + } + } else { + //logger.GrpcLog.Errorf("Connectivity status not ready") + continue + } + } + rsp, err := stream.Recv() + if err != nil { + logger.GrpcLog.Errorf("Failed to receive message: %v", err) + // Clearing the stream will force the client to resubscribe on next iteration + stream = nil + time.Sleep(time.Second * 5) + // Retry on failure + continue + } + + logger.GrpcLog.Infoln("stream msg recieved ") + logger.GrpcLog.Debugf("#Network Slices %v, RC of configpod %v ", len(rsp.NetworkSlice), rsp.RestartCounter) + if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) { + // first time connection or config update + configPodRestartCounter = rsp.RestartCounter + if len(rsp.NetworkSlice) > 0 { + // always carries full config copy + logger.GrpcLog.Infoln("First time config Received ", rsp) + commChan <- rsp + } else if rsp.ConfigUpdated == 1 { + // config delete , all slices deleted + logger.GrpcLog.Infoln("Complete config deleted ") + commChan <- rsp + } + } else if len(rsp.NetworkSlice) > 0 { + logger.GrpcLog.Errorf("Config received after config Pod restart") + //config received after config pod restart + configPodRestartCounter = rsp.RestartCounter + commChan <- rsp + } else { + logger.GrpcLog.Errorf("Config Pod is restarted and no config received") + } + } +} func readConfigInLoop(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) { myid := os.Getenv("HOSTNAME") configReadTimeout := time.NewTicker(5000 * time.Millisecond)