Skip to content

Commit

Permalink
Add support of heartbeat interval in "on change" subscription, so that
Browse files Browse the repository at this point in the history
full sync would happen periodically when hb is triggered.
  • Loading branch information
FengPan-Frank committed Oct 11, 2023
1 parent cbb7631 commit dc18626
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 9 deletions.
15 changes: 12 additions & 3 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ type subscriptionQuery struct {
Query []string
SubMode pb.SubscriptionMode
SampleInterval uint64
HeartbeatInterval uint64
}

func pathToString(q client.Path) string {
Expand Down Expand Up @@ -778,14 +779,15 @@ func createEventsQuery(t *testing.T, paths ...string) client.Query {
false)
}

func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
func createStateDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query {
return createQueryOrFail(t,
pb.SubscriptionList_STREAM,
"STATE_DB",
[]subscriptionQuery{
{
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
HeartbeatInterval: uint64(interval.Nanoseconds()),
},
},
false)
Expand Down Expand Up @@ -3133,12 +3135,19 @@ func TestTableKeyOnDeletion(t *testing.T) {
tests := []struct {
desc string
q client.Query
wantSubErr error
wantNoti []client.Notification
paths []string
}{
{
desc: "Testing invalid heartbeat interval",
q: createStateDbQueryOnChangeMode(t, 10 * time.Second, "NEIGH_STATE_TABLE"),
wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval),
wantNoti: []client.Notification{},
},
{
desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.57",
q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"),
q: createStateDbQueryOnChangeMode(t, 2 * time.Minute, "NEIGH_STATE_TABLE"),
wantNoti: []client.Notification {
client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJson},
client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson57},
Expand All @@ -3149,7 +3158,7 @@ func TestTableKeyOnDeletion(t *testing.T) {
},
{
desc: "Testing deletion of NEIGH_STATE_TABLE:10.0.0.59 and NEIGH_STATE_TABLE 10.0.0.61",
q: createStateDbQueryOnChangeMode(t, "NEIGH_STATE_TABLE"),
q: createStateDbQueryOnChangeMode(t, 2 * time.Minute, "NEIGH_STATE_TABLE"),
wantNoti: []client.Notification {
client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableJsonTwo},
client.Update{Path: []string{"NEIGH_STATE_TABLE"}, TS: time.Unix(0, 200), Val: neighStateTableDeletedJson59},
Expand Down
47 changes: 41 additions & 6 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
// Any non-zero value that less than this threshold is considered invalid argument.
var MinSampleInterval = time.Second

// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
// This is reserved value, which should be adjusted per BGPL benchmark result.
var MinHeartbeatInterval = 1 * time.Minute

// IntervalTicker is a factory method to implement interval ticking.
// Exposed for UT purposes.
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
Expand Down Expand Up @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
for gnmiPath := range c.pathG2S {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, gnmiPath)
go streamOnChangeSubscription(c, gnmiPath, nil)
}
} else {
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
Expand All @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, sub.GetPath())
go streamOnChangeSubscription(c, nil, sub)
} else {
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
return
Expand All @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
}

// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
if gnmiPath == nil {
gnmiPath = sub.GetPath()
}

// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
var heartbeatInterval time.Duration = 0
if sub != nil {
var err error
heartbeatInterval, err = validateHeartbeatInterval(sub)
if err != nil {
enqueueFatalMsg(c, err.Error())
c.synced.Done()
c.w.Done()
return
}
}

tblPaths := c.pathG2S[gnmiPath]
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)

if tblPaths[0].field != "" {
if len(tblPaths) > 1 {
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
} else {
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
}
} else {
// sample interval and update only parameters are not applicable
go dbTableKeySubscribe(c, gnmiPath, 0, true)
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
}
}

Expand Down Expand Up @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
return requestedInterval, nil
}
}

// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
if requestedInterval == 0 {
// If the heartbeat_interval is set to 0, the target MUST create the subscription
// and send the data with the MinHeartbeatInterval
return MinHeartbeatInterval, nil
} else if requestedInterval < MinHeartbeatInterval {
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
} else {
return requestedInterval, nil
}
}

0 comments on commit dc18626

Please sign in to comment.