Skip to content

Commit

Permalink
stream api change
Browse files Browse the repository at this point in the history
  • Loading branch information
badhri85 committed Aug 4, 2021
1 parent b9cb5ae commit 6d5ccc1
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 191 deletions.
58 changes: 57 additions & 1 deletion proto/client/gClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func ConfigWatcher() chan *protos.NetworkSliceResponse {
return nil
}
commChan := make(chan *protos.NetworkSliceResponse)
go readConfigInLoop(confClient, commChan)
go subscribeToConfigPod(confClient, commChan)
return commChan
}

Expand Down Expand Up @@ -102,6 +102,62 @@ func GetConnection(host string) (conn *grpc.ClientConn, err error) {
return conn, err
}

func subscribeToConfigPod(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) {
logger.GrpcLog.Infoln("subscribeToConfigPod ")
myid := os.Getenv("HOSTNAME")
var stream protos.ConfigService_NetworkSliceSubscribeClient
for {
if stream == nil {
status := confClient.Conn.GetState()
var err error
if status == connectivity.Ready {
logger.GrpcLog.Infoln("connectivity ready ")
rreq := &protos.NetworkSliceRequest{RestartCounter: selfRestartCounter, ClientId: myid}
if stream, err = confClient.Client.NetworkSliceSubscribe(context.Background(), rreq); err != nil {
logger.GrpcLog.Errorf("Failed to subscribe: %v", err)
time.Sleep(time.Second * 5)
// Retry on failure
continue
}
} else {
//logger.GrpcLog.Errorf("Connectivity status not ready")
continue
}
}
rsp, err := stream.Recv()
if err != nil {
logger.GrpcLog.Errorf("Failed to receive message: %v", err)
// Clearing the stream will force the client to resubscribe on next iteration
stream = nil
time.Sleep(time.Second * 5)
// Retry on failure
continue
}

logger.GrpcLog.Infoln("stream msg recieved ")
logger.GrpcLog.Debugf("#Network Slices %v, RC of configpod %v ", len(rsp.NetworkSlice), rsp.RestartCounter)
if configPodRestartCounter == 0 || (configPodRestartCounter == rsp.RestartCounter) {
// first time connection or config update
configPodRestartCounter = rsp.RestartCounter
if len(rsp.NetworkSlice) > 0 {
// always carries full config copy
logger.GrpcLog.Infoln("First time config Received ", rsp)
commChan <- rsp
} else if rsp.ConfigUpdated == 1 {
// config delete , all slices deleted
logger.GrpcLog.Infoln("Complete config deleted ")
commChan <- rsp
}
} else if len(rsp.NetworkSlice) > 0 {
logger.GrpcLog.Errorf("Config received after config Pod restart")
//config received after config pod restart
configPodRestartCounter = rsp.RestartCounter
commChan <- rsp
} else {
logger.GrpcLog.Errorf("Config Pod is restarted and no config received")
}
}
}
func readConfigInLoop(confClient *ConfigClient, commChan chan *protos.NetworkSliceResponse) {
myid := os.Getenv("HOSTNAME")
configReadTimeout := time.NewTicker(5000 * time.Millisecond)
Expand Down
4 changes: 2 additions & 2 deletions proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package sdcoreConfig;
option go_package = "./sdcoreConfig";

service ConfigService {
rpc GetNetworkSlice(NetworkSliceRequest) returns (NetworkSliceResponse) {
}
rpc GetNetworkSlice(NetworkSliceRequest) returns (NetworkSliceResponse) {}
rpc NetworkSliceSubscribe(NetworkSliceRequest) returns (stream NetworkSliceResponse) {}
}

enum Status {
Expand Down
Loading

0 comments on commit 6d5ccc1

Please sign in to comment.