Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return Grpc.Internal error once there's socker read timeout issue occurs in streaming subscribe under routing flap case #110

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions gnmi_server/client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"net"
"sync"
"strings"

"github.com/Workiva/go-datastructures/queue"
log "github.com/golang/glog"
Expand Down Expand Up @@ -207,6 +208,10 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) {
c.Close()
// Wait until all child go routines exited
c.w.Wait()
if strings.Contains(err.Error(), "i/o timeout") {
Copy link
Collaborator

@qiluo-msft qiluo-msft Jun 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Contains

Match error message string will be fragile? Which library assign this error message? Could you use error code? #Closed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but seems this will require NOT small changes, like now we're using priorityQueue in all db_client which only stores error messages, refer

func putFatalMsg(q *queue.PriorityQueue, msg string) {
, do you suggest we polish this part now or later together with the project change?

return grpc.Errorf(codes.Internal, "%s", err)
}

return grpc.Errorf(codes.InvalidArgument, "%s", err)
}

Expand Down
66 changes: 66 additions & 0 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3255,6 +3255,72 @@ func TestConnectionsKeepAlive(t *testing.T) {
}
}

func TestConnectionRedisFailure(t *testing.T) {
s := createServer(t, 8081)
go runServer(t, s)
defer s.s.Stop()

test := struct {
desc string
q client.Query
want []client.Notification
poll int
}{
desc: "poll query for COUNTERS/Ethernet*",
poll: 10,
q: client.Query{
Target: "COUNTERS_DB",
Type: client.Poll,
Queries: []client.Path{{"COUNTERS", "Ethernet*"}},
TLS: &tls.Config{InsecureSkipVerify: true},
},
want: []client.Notification{
client.Connected{},
client.Sync{},
},
}
namespace := sdcfg.GetDbDefaultNamespace()
rclient := getRedisClientN(t, 6, namespace)
defer rclient.Close()

prepareStateDb(t, namespace)
t.Run(test.desc, func(t *testing.T) {
q := test.q
q.Addrs = []string{"127.0.0.1:8081"}
c := client.New()

wg := new(sync.WaitGroup)
wg.Add(1)

sdc.MockFail = 1
go func() {
defer wg.Done()
if err := c.Subscribe(context.Background(), q); err != nil {
t.Errorf("c.Subscribe(): got error %v, expected nil", err)
}
}()

wg.Wait()
sdc.MockFail = 0
resultMap, err := rclient.HGetAll("TELEMETRY_CONNECTIONS").Result()

if resultMap == nil {
t.Errorf("result Map is nil, expected non nil, err: %v", err)
}
if len(resultMap) != 1 {
t.Errorf("result for TELEMETRY_CONNECTIONS should be 1")
}

for key, _ := range resultMap {
if !strings.Contains(key, "COUNTERS_DB|COUNTERS|Ethernet*") {
t.Errorf("key is expected to contain correct query, received: %s", key)
}
}

c.Close()
})
}

func TestClient(t *testing.T) {
var mutexDeInit sync.RWMutex
var mutexHB sync.RWMutex
Expand Down
7 changes: 7 additions & 0 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var IntervalTicker = func(interval time.Duration) <-chan time.Time {
}

var NeedMock bool = false
var MockFail int = 0
var intervalTickerMutex sync.Mutex

// Define a new function to set the IntervalTicker variable
Expand Down Expand Up @@ -744,6 +745,12 @@ func tableData2Msi(tblPath *tablePath, useKey bool, op *string, msi *map[string]
return nil
}

if MockFail == 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MockFail

Should it only be used in unit test?

MockFail++
fmt.Printf("mock sleep for redis timeout\n")
time.Sleep(30 * time.Second)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use gomonkey to replace MockFail?


for idx, dbkey := range dbkeys {
fv, err = redisDb.HGetAll(dbkey).Result()
if err != nil {
Expand Down