From f217325b002cbc62b567dfc997b297132c0398b9 Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Mon, 27 Jun 2022 16:55:37 +0000 Subject: [PATCH 01/10] Events client is being invoked. gnmi_cli -client_types=gnmi -a 127.0.0.1:50051 -t EVENTS -logtostderr -insecure -v 7 -streaming_type ON_CHANGE -q aaa/bbb -qt s --- gnmi_server/client_subscribe.go | 6 +- sonic_data_client/events_client.go | 157 +++++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 sonic_data_client/events_client.go diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 1c2d6f313..4ef6a32ec 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -121,8 +121,12 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { } var dc sdc.Client + mode := c.subscribe.GetMode() + if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) + } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { + dc, err = sdc.NewEventClient(paths, prefix) } else if _, ok, _, _ := sdc.IsTargetDb(target); ok { dc, err = sdc.NewDbClient(paths, prefix) } else { @@ -134,7 +138,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { return grpc.Errorf(codes.NotFound, "%v", err) } - switch mode := c.subscribe.GetMode(); mode { + switch mode { case gnmipb.SubscriptionList_STREAM: c.stop = make(chan struct{}, 1) c.w.Add(1) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go new file mode 100644 index 000000000..05aaf1948 --- /dev/null +++ b/sonic_data_client/events_client.go @@ -0,0 +1,157 @@ +package client + +import ( + "encoding/json" + "fmt" + "sync" + "strconv" + "time" + + spb "github.com/Azure/sonic-telemetry/proto" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +type EventClient struct { + + prefix *gnmipb.Path + path *gnmipb.Path + + q *queue.PriorityQueue + channel chan struct{} + + w *sync.WaitGroup // wait for all sub go routines to finish +} + + +func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { + var evtc EventClient + evtc.prefix = prefix + for _, path := range paths { + // Only one path is expected. Take the last if many + evtc.path = path + } + log.Errorf("NewEventClient constructed"); + + return &evtc, nil +} + +// String returns the target the client is querying. +func (c *EventClient) String() string { + return fmt.Sprintf("EventClient Prefix %v", c.prefix.GetTarget()) +} + + +func get_events(c *EventClient, updateChannel chan map[string]interface{}) { + + for i := 0; i<10; i++ { + newMsi := make(map[string]interface{}) + data := make(map[string]interface{}) + + data["foo"] = "bar" + data["hello"] = "world" + + newMsi["event_" + strconv.Itoa(i)] = data + + updateChannel <- newMsi + + time.After(time.Second) + log.Errorf("get_events i=%d", i); + } + return +} + + +func send_event(c *EventClient, val *map[string]interface{}) error { + log.Errorf("send_event calling json.Marshal"); + j, err := json.Marshal(*val) + if err != nil { + log.Errorf("emitJSON Failed") + log.Errorf("emitJSON err %s for %v", err, *val) + return err + } + + log.Errorf("send_event calling gnmipb.TypedValue"); + tv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_JsonIetfVal{ + JsonIetfVal: j, + }} + + spbv := &spb.Value{ + Prefix: c.prefix, + Path: c.path, + Timestamp: time.Now().UnixNano(), + Val: tv, + } + + log.Errorf("Sending spbv"); + log.Errorf("spbv: %v", *spbv); + if err = c.q.Put(Value{spbv}); err != nil { + log.Errorf("Queue error: %v", err) + return err + } + log.Errorf("send_event done") + return nil +} + +func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + data := make(map[string]interface{}) + data["heart"] = "beat" + + c.w = w + defer c.w.Done() + + c.q = q + c.channel = stop + + updateChannel := make(chan map[string]interface{}) + go get_events(c, updateChannel) + + for { + select { + case nextEvent := <-updateChannel: + log.Errorf("update received: %v", nextEvent) + if err := send_event(c, &nextEvent); err != nil { + return + } + + case <-IntervalTicker(time.Second * 2): + log.Errorf("Ticker received") + if err := send_event(c, &data); err != nil { + return + } + case <-c.channel: + log.Errorf("Channel closed by client") + return + } + } + log.Errorf("Event exiting streamrun") +} + + +// TODO: Log data related to this session + +func (c *EventClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { + return nil, nil +} + +func (c *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + +func (c *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + return +} + + +func (c *EventClient) Close() error { + return nil +} + +func (c *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { + return nil +} +func (c *EventClient) Capabilities() []gnmipb.ModelData { + return nil +} From 075ad8271fda34b48d8de09ddd7a8f0e5601c715 Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Tue, 5 Jul 2022 15:21:40 +0000 Subject: [PATCH 02/10] temp commit to enable merge --- sonic_data_client/events_client.go | 111 ++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index 05aaf1948..f1402dd65 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -1,11 +1,19 @@ package client +/* +#cgo CFLAGS: -g -Wall -I/sonic/src/sonic-swss-common/common -Wformat -Werror=format-security -fPIE +#cgo LDFLAGS: -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon -Wl,-rpath,/sonic/target/files/bullseye +#include +#include "events_wrap.h" +*/ +import "C" + import ( "encoding/json" "fmt" "sync" - "strconv" "time" + "unsafe" spb "github.com/Azure/sonic-telemetry/proto" "github.com/Workiva/go-datastructures/queue" @@ -18,12 +26,18 @@ type EventClient struct { prefix *gnmipb.Path path *gnmipb.Path - q *queue.PriorityQueue - channel chan struct{} + q *queue.PriorityQueue + channel chan struct{} + + w *sync.WaitGroup // wait for all sub go routines to finish - w *sync.WaitGroup // wait for all sub go routines to finish + subs_handle unsafe.Pointer } +const SUBSCRIBER_TIMEOUT = 2 +const HEARTBEAT_TIMEOUT = 2 +const EVENT_BUFFSZ = 4096 +const MISSED_BUFFSZ = 16 func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { var evtc EventClient @@ -32,8 +46,19 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { // Only one path is expected. Take the last if many evtc.path = path } - log.Errorf("NewEventClient constructed"); + log.Errorf("NewEventClient constructed") + /* Init subscriber with 2 seconds time out */ + subs_data := make(map[string]interface{}) + subs_data["recv_timeout"] = SUBSCRIBER_TIMEOUT + j, err := json.Marshal(subs_data) + if err != nil { + js := string(j) + evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js)) + log.Errorf("events_init_subscriber: h=%v", evtc.subs_handle) + return nil, err + } + log.Errorf("events_init_subscriber: Failed to marshal") return &evtc, nil } @@ -43,39 +68,52 @@ func (c *EventClient) String() string { } -func get_events(c *EventClient, updateChannel chan map[string]interface{}) { +func get_events(c *EventClient, updateChannel chan string) { - for i := 0; i<10; i++ { - newMsi := make(map[string]interface{}) - data := make(map[string]interface{}) - - data["foo"] = "bar" - data["hello"] = "world" + evt_ptr := C.malloc(C.sizeof_char * EVENT_BUFFSZ) + missed_ptr := C.malloc(C.sizeof_char * MISSED_BUFFSZ) - newMsi["event_" + strconv.Itoa(i)] = data + defer C.free(unsafe.Pointer(evt_ptr)) + defer C.free(unsafe.Pointer(missed_ptr)) - updateChannel <- newMsi - - time.After(time.Second) - log.Errorf("get_events i=%d", i); + for { + c_eptr := (*C.char)(unsafe.Pointer(evt_ptr)) + // c_eptr := (*C.char)(evt_ptr) + // c_mptr := (*C.char)(missed_ptr) + // c_hptr := (unsafe.Pointer)(c.subs_handle) + // rc := C.event_receive_wrap(c_hptr, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) + //rc := 5 + // sz := C.int(20) + rc := event_receive_wrap_54(c_eptr) + C.events_init_subscriber_wrap(c_eptr) // Good + + if rc == 0 { + updateChannel <- C.GoString((*C.char)(evt_ptr)) + } + _, more := <-c.channel + if !more { + log.V(1).Infof("%v stop channel closed, exiting get_events routine", c) + // c_hptr := (unsafe.Pointer)(c.subs_handle) + events_deinit_subscriber_wrap(c.subs_handle) + c.subs_handle = nil + return + } } - return } -func send_event(c *EventClient, val *map[string]interface{}) error { - log.Errorf("send_event calling json.Marshal"); - j, err := json.Marshal(*val) +func send_event(c *EventClient, sval *string) error { + log.Errorf("send_event calling json.Marshal") + + val, err := json.Marshal(sval) if err != nil { - log.Errorf("emitJSON Failed") - log.Errorf("emitJSON err %s for %v", err, *val) - return err + log.Errorf("Failed to marshall %V", err) } - log.Errorf("send_event calling gnmipb.TypedValue"); + log.Errorf("send_event calling gnmipb.TypedValue") tv := &gnmipb.TypedValue { Value: &gnmipb.TypedValue_JsonIetfVal{ - JsonIetfVal: j, + JsonIetfVal: val, }} spbv := &spb.Value{ @@ -85,9 +123,9 @@ func send_event(c *EventClient, val *map[string]interface{}) error { Val: tv, } - log.Errorf("Sending spbv"); - log.Errorf("spbv: %v", *spbv); - if err = c.q.Put(Value{spbv}); err != nil { + log.Errorf("Sending spbv") + log.Errorf("spbv: %v", *spbv) + if err := c.q.Put(Value{spbv}); err != nil { log.Errorf("Queue error: %v", err) return err } @@ -98,6 +136,12 @@ func send_event(c *EventClient, val *map[string]interface{}) error { func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { data := make(map[string]interface{}) data["heart"] = "beat" + hb, err := json.Marshal(data) + if err != nil { + log.Errorf("StreamRun: Failed to marshal hearbet data") + return + } + hstr := string(hb) c.w = w defer c.w.Done() @@ -105,7 +149,7 @@ func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *s c.q = q c.channel = stop - updateChannel := make(chan map[string]interface{}) + updateChannel := make(chan string) go get_events(c, updateChannel) for { @@ -116,9 +160,9 @@ func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *s return } - case <-IntervalTicker(time.Second * 2): + case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT): log.Errorf("Ticker received") - if err := send_event(c, &data); err != nil { + if err := send_event(c, &hstr); err != nil { return } case <-c.channel: @@ -155,3 +199,6 @@ func (c *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, upda func (c *EventClient) Capabilities() []gnmipb.ModelData { return nil } + +// cgo LDFLAGS: -L/sonic/target/files/bullseye -lxswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lxxeventxx -Wl,-rpath,/sonic/target/files/bullseye + From d7ba03b994f2a6a6da08124896748ef4edf70b7a Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Thu, 7 Jul 2022 23:58:05 +0000 Subject: [PATCH 03/10] end to end work --- sonic_data_client/events_client.go | 126 ++++++++++++++--------------- 1 file changed, 61 insertions(+), 65 deletions(-) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index f1402dd65..a3a638632 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -2,7 +2,7 @@ package client /* #cgo CFLAGS: -g -Wall -I/sonic/src/sonic-swss-common/common -Wformat -Werror=format-security -fPIE -#cgo LDFLAGS: -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon -Wl,-rpath,/sonic/target/files/bullseye +#cgo LDFLAGS: -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lswsscommon #include #include "events_wrap.h" */ @@ -32,6 +32,8 @@ type EventClient struct { w *sync.WaitGroup // wait for all sub go routines to finish subs_handle unsafe.Pointer + + stopped int } const SUBSCRIBER_TIMEOUT = 2 @@ -48,27 +50,30 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { } log.Errorf("NewEventClient constructed") + C.swssSetLogPriority(7) + /* Init subscriber with 2 seconds time out */ subs_data := make(map[string]interface{}) subs_data["recv_timeout"] = SUBSCRIBER_TIMEOUT j, err := json.Marshal(subs_data) if err != nil { - js := string(j) - evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js)) - log.Errorf("events_init_subscriber: h=%v", evtc.subs_handle) + log.Errorf("events_init_subscriber: Failed to marshal") return nil, err } - log.Errorf("events_init_subscriber: Failed to marshal") + js := string(j) + evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js)) + log.Errorf("events_init_subscriber: h=%v", evtc.subs_handle) + evtc.stopped = 0 return &evtc, nil } // String returns the target the client is querying. -func (c *EventClient) String() string { - return fmt.Sprintf("EventClient Prefix %v", c.prefix.GetTarget()) +func (evtc *EventClient) String() string { + return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget()) } -func get_events(c *EventClient, updateChannel chan string) { +func get_events(evtc *EventClient, updateChannel chan string) { evt_ptr := C.malloc(C.sizeof_char * EVENT_BUFFSZ) missed_ptr := C.malloc(C.sizeof_char * MISSED_BUFFSZ) @@ -76,56 +81,41 @@ func get_events(c *EventClient, updateChannel chan string) { defer C.free(unsafe.Pointer(evt_ptr)) defer C.free(unsafe.Pointer(missed_ptr)) + c_eptr := (*C.char)(unsafe.Pointer(evt_ptr)) + c_mptr := (*C.char)(unsafe.Pointer(missed_ptr)) + for { - c_eptr := (*C.char)(unsafe.Pointer(evt_ptr)) - // c_eptr := (*C.char)(evt_ptr) - // c_mptr := (*C.char)(missed_ptr) - // c_hptr := (unsafe.Pointer)(c.subs_handle) - // rc := C.event_receive_wrap(c_hptr, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) - //rc := 5 - // sz := C.int(20) - rc := event_receive_wrap_54(c_eptr) - C.events_init_subscriber_wrap(c_eptr) // Good - - if rc == 0 { + log.Errorf("Call C.event_receive_wrap") + rc := C.event_receive_wrap(evtc.subs_handle, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) + log.Errorf("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) + + if rc != 0 { updateChannel <- C.GoString((*C.char)(evt_ptr)) } - _, more := <-c.channel - if !more { - log.V(1).Infof("%v stop channel closed, exiting get_events routine", c) - // c_hptr := (unsafe.Pointer)(c.subs_handle) - events_deinit_subscriber_wrap(c.subs_handle) - c.subs_handle = nil + if evtc.stopped == 1 { + log.V(1).Infof("%v stop channel closed, exiting get_events routine", evtc) + C.events_deinit_subscriber_wrap(evtc.subs_handle) + evtc.subs_handle = nil return } + log.Errorf("back to read loop") + // TODO: Record missed count in stats table. + // intVar, err := strconv.Atoi(C.GoString((*C.char)(c_mptr))) } } -func send_event(c *EventClient, sval *string) error { - log.Errorf("send_event calling json.Marshal") - - val, err := json.Marshal(sval) - if err != nil { - log.Errorf("Failed to marshall %V", err) - } - - log.Errorf("send_event calling gnmipb.TypedValue") - tv := &gnmipb.TypedValue { - Value: &gnmipb.TypedValue_JsonIetfVal{ - JsonIetfVal: val, - }} - +func send_event(evtc *EventClient, tv *gnmipb.TypedValue) error { spbv := &spb.Value{ - Prefix: c.prefix, - Path: c.path, + Prefix: evtc.prefix, + Path: evtc.path, Timestamp: time.Now().UnixNano(), Val: tv, } log.Errorf("Sending spbv") log.Errorf("spbv: %v", *spbv) - if err := c.q.Put(Value{spbv}); err != nil { + if err := evtc.q.Put(Value{spbv}); err != nil { log.Errorf("Queue error: %v", err) return err } @@ -133,39 +123,45 @@ func send_event(c *EventClient, sval *string) error { return nil } -func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - data := make(map[string]interface{}) - data["heart"] = "beat" - hb, err := json.Marshal(data) - if err != nil { - log.Errorf("StreamRun: Failed to marshal hearbet data") - return - } - hstr := string(hb) +func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { + hbData := make(map[string]interface{}) + hbData["heart"] = "beat" + hbVal, _ := json.Marshal(hbData) + + hbTv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_JsonIetfVal{ + JsonIetfVal: hbVal, + }} + - c.w = w - defer c.w.Done() + evtc.w = w + defer evtc.w.Done() - c.q = q - c.channel = stop + evtc.q = q + evtc.channel = stop updateChannel := make(chan string) - go get_events(c, updateChannel) + go get_events(evtc, updateChannel) for { select { case nextEvent := <-updateChannel: log.Errorf("update received: %v", nextEvent) - if err := send_event(c, &nextEvent); err != nil { + evtTv := &gnmipb.TypedValue { + Value: &gnmipb.TypedValue_StringVal { + StringVal: nextEvent, + }} + if err := send_event(evtc, evtTv); err != nil { return } case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT): log.Errorf("Ticker received") - if err := send_event(c, &hstr); err != nil { + if err := send_event(evtc, hbTv); err != nil { return } - case <-c.channel: + case <-evtc.channel: + evtc.stopped = 1 log.Errorf("Channel closed by client") return } @@ -176,27 +172,27 @@ func (c *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *s // TODO: Log data related to this session -func (c *EventClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { +func (evtc *EventClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { return nil, nil } -func (c *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { +func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { return } -func (c *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { +func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { return } -func (c *EventClient) Close() error { +func (evtc *EventClient) Close() error { return nil } -func (c *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { +func (evtc *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { return nil } -func (c *EventClient) Capabilities() []gnmipb.ModelData { +func (evtc *EventClient) Capabilities() []gnmipb.ModelData { return nil } From 142991e5c372c378df9e676661210bf66d3d7753 Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Fri, 8 Jul 2022 21:12:00 +0000 Subject: [PATCH 04/10] Set swss log level --- gnmi_server/client_subscribe.go | 19 ++++++++++++++++++- gnmi_server/server.go | 3 +++ sonic_data_client/events_client.go | 8 ++++---- telemetry/telemetry.go | 3 +++ 4 files changed, 28 insertions(+), 5 deletions(-) diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 4ef6a32ec..ad11ca183 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -30,17 +30,34 @@ type Client struct { // Wait for all sub go routine to finish w sync.WaitGroup fatal bool + logLevel int } +// Syslog level for error +const logLevelError int = 3 +const logLevelDebug int = 7 +const logLevelMax int = logLevelDebug + // NewClient returns a new initialized client. func NewClient(addr net.Addr) *Client { pq := queue.NewPriorityQueue(1, false) return &Client{ addr: addr, q: pq, + logLevel: logLevelError, } } +func (c *Client) setLogLevel(lvl int) { + if (lvl >= 0) { + if lvl < logLevelMax { + c.logLevel = lvl + } else { + c.logLevel = logLevelMax + } + } +} + // String returns the target the client is querying. func (c *Client) String() string { return c.addr.String() @@ -126,7 +143,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { - dc, err = sdc.NewEventClient(paths, prefix) + dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) } else if _, ok, _, _ := sdc.IsTargetDb(target); ok { dc, err = sdc.NewDbClient(paths, prefix) } else { diff --git a/gnmi_server/server.go b/gnmi_server/server.go index f21f8d68a..5ef22b5df 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -47,6 +47,7 @@ type Config struct { // Port for the Server to listen on. If 0 or unset the Server will pick a port // for this Server. Port int64 + LogLevel int UserAuth AuthTypes } @@ -233,6 +234,8 @@ func (s *Server) Subscribe(stream gnmipb.GNMI_SubscribeServer) error { c := NewClient(pr.Addr) + c.setLogLevel(s.config.LogLevel) + s.cMu.Lock() if oc, ok := s.clients[c.String()]; ok { log.V(2).Infof("Delete duplicate client %s", oc) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index a3a638632..e7b739623 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -36,21 +36,21 @@ type EventClient struct { stopped int } -const SUBSCRIBER_TIMEOUT = 2 +const SUBSCRIBER_TIMEOUT = (2 * 1000) // 2 seconds const HEARTBEAT_TIMEOUT = 2 const EVENT_BUFFSZ = 4096 const MISSED_BUFFSZ = 16 -func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { +func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Client, error) { var evtc EventClient evtc.prefix = prefix for _, path := range paths { // Only one path is expected. Take the last if many evtc.path = path } - log.Errorf("NewEventClient constructed") + log.Errorf("NewEventClient constructed. logLevel=%d", logLevel) - C.swssSetLogPriority(7) + C.swssSetLogPriority(C.int(logLevel)) /* Init subscriber with 2 seconds time out */ subs_data := make(map[string]interface{}) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 4ec580c6b..85ca7a324 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -17,6 +17,7 @@ import ( var ( userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} + logLevel = flag.Int("v", 3, "set log level 0 to 7; default: 3") port = flag.Int("port", -1, "port to listen on") // Certificate files. caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") @@ -60,6 +61,8 @@ func main() { cfg.Port = int64(*port) var opts []grpc.ServerOption + cfg.LogLevel = *logLevel + if !*noTLS { var certificate tls.Certificate var err error From 42f96ac62ac40096d497c13e2f534de27b4a6deb Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Sun, 10 Jul 2022 20:06:15 +0000 Subject: [PATCH 05/10] skip re-defining -v flag --- telemetry/telemetry.go | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 85ca7a324..7798c1bd1 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "flag" "io/ioutil" + "strconv" "time" log "github.com/golang/glog" @@ -17,7 +18,6 @@ import ( var ( userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} - logLevel = flag.Int("v", 3, "set log level 0 to 7; default: 3") port = flag.Int("port", -1, "port to listen on") // Certificate files. caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") @@ -42,12 +42,12 @@ func main() { defUserAuth = gnmi.AuthTypes{"jwt": false, "password": false, "cert": false} } - if isFlagPassed("client_auth") { - log.V(1).Infof("client_auth provided") - }else { - log.V(1).Infof("client_auth not provided, using defaults.") - userAuth = defUserAuth - } + if isFlagPassed("client_auth") { + log.V(1).Infof("client_auth provided") + }else { + log.V(1).Infof("client_auth not provided, using defaults.") + userAuth = defUserAuth + } switch { case *port <= 0: @@ -61,7 +61,12 @@ func main() { cfg.Port = int64(*port) var opts []grpc.ServerOption - cfg.LogLevel = *logLevel + cfg.LogLevel, _ = strconv.Atoi(getflag("v")) + log.Errorf("flag: log level %v", cfg.LogLevel) + if cfg.LogLevel == 0 { + log.Errorf("resetting log level 0 to 7") + cfg.LogLevel = 7 + } if !*noTLS { var certificate tls.Certificate @@ -155,3 +160,13 @@ func isFlagPassed(name string) bool { }) return found } + +func getflag(name string) string { + val := "" + flag.VisitAll(func(f *flag.Flag) { + if f.Name == name { + val = f.Value.String() + } + }) + return val +} From 58e62f87b5ec302f226cc127741002a198cf2c73 Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Sun, 10 Jul 2022 22:27:51 +0000 Subject: [PATCH 06/10] tabs/spaces corrections; no code change --- gnmi_server/client_subscribe.go | 2 +- sonic_data_client/events_client.go | 32 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index ad11ca183..82ade83bb 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -142,7 +142,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) - } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { + } else if ((target == "EVENTS") && (mode == gnmipb.SubscriptionList_STREAM)) { dc, err = sdc.NewEventClient(paths, prefix, c.logLevel) } else if _, ok, _, _ := sdc.IsTargetDb(target); ok { dc, err = sdc.NewDbClient(paths, prefix) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index e7b739623..1f8441193 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -9,25 +9,25 @@ package client import "C" import ( - "encoding/json" + "encoding/json" "fmt" - "sync" - "time" - "unsafe" - - spb "github.com/Azure/sonic-telemetry/proto" - "github.com/Workiva/go-datastructures/queue" - log "github.com/golang/glog" - gnmipb "github.com/openconfig/gnmi/proto/gnmi" + "sync" + "time" + "unsafe" + + spb "github.com/Azure/sonic-telemetry/proto" + "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) type EventClient struct { - prefix *gnmipb.Path + prefix *gnmipb.Path path *gnmipb.Path q *queue.PriorityQueue - channel chan struct{} + channel chan struct{} w *sync.WaitGroup // wait for all sub go routines to finish @@ -69,7 +69,7 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl // String returns the target the client is querying. func (evtc *EventClient) String() string { - return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget()) + return fmt.Sprintf("EventClient Prefix %v", evtc.prefix.GetTarget()) } @@ -134,7 +134,7 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w }} - evtc.w = w + evtc.w = w defer evtc.w.Done() evtc.q = q @@ -186,14 +186,14 @@ func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w * func (evtc *EventClient) Close() error { - return nil + return nil } func (evtc *EventClient) Set(delete []*gnmipb.Path, replace []*gnmipb.Update, update []*gnmipb.Update) error { - return nil + return nil } func (evtc *EventClient) Capabilities() []gnmipb.ModelData { - return nil + return nil } // cgo LDFLAGS: -L/sonic/target/files/bullseye -lxswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -lxxeventxx -Wl,-rpath,/sonic/target/files/bullseye From b634ba4f012355ef75c7f1666f6da1db21a0278a Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Sun, 10 Jul 2022 22:34:53 +0000 Subject: [PATCH 07/10] re-evaluate log messages; no code change --- sonic_data_client/events_client.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index 1f8441193..c2db0342d 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -85,9 +85,9 @@ func get_events(evtc *EventClient, updateChannel chan string) { c_mptr := (*C.char)(unsafe.Pointer(missed_ptr)) for { - log.Errorf("Call C.event_receive_wrap") + log.V(7).("Call C.event_receive_wrap") rc := C.event_receive_wrap(evtc.subs_handle, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) - log.Errorf("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) + log.V(7).("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) if rc != 0 { updateChannel <- C.GoString((*C.char)(evt_ptr)) @@ -98,7 +98,6 @@ func get_events(evtc *EventClient, updateChannel chan string) { evtc.subs_handle = nil return } - log.Errorf("back to read loop") // TODO: Record missed count in stats table. // intVar, err := strconv.Atoi(C.GoString((*C.char)(c_mptr))) } @@ -113,13 +112,13 @@ func send_event(evtc *EventClient, tv *gnmipb.TypedValue) error { Val: tv, } - log.Errorf("Sending spbv") - log.Errorf("spbv: %v", *spbv) + log.V(7).("Sending spbv") + log.V(7).("spbv: %v", *spbv) if err := evtc.q.Put(Value{spbv}); err != nil { log.Errorf("Queue error: %v", err) return err } - log.Errorf("send_event done") + log.V(7).("send_event done") return nil } @@ -146,7 +145,7 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w for { select { case nextEvent := <-updateChannel: - log.Errorf("update received: %v", nextEvent) + log.V(7).("update received: %v", nextEvent) evtTv := &gnmipb.TypedValue { Value: &gnmipb.TypedValue_StringVal { StringVal: nextEvent, @@ -156,7 +155,7 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w } case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT): - log.Errorf("Ticker received") + log.V(7).("Ticker received") if err := send_event(evtc, hbTv); err != nil { return } From 117b6ef07309726f467d808e2cd4b81f1a75840b Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Sun, 10 Jul 2022 22:49:11 +0000 Subject: [PATCH 08/10] s/space/tab; no code change --- telemetry/telemetry.go | 68 +++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 7798c1bd1..8f363ed28 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -17,7 +17,7 @@ import ( ) var ( - userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} + userAuth = gnmi.AuthTypes{"password": false, "cert": false, "jwt": false} port = flag.Int("port", -1, "port to listen on") // Certificate files. caCert = flag.String("ca_crt", "", "CA certificate for client certificate validation. Optional.") @@ -42,12 +42,12 @@ func main() { defUserAuth = gnmi.AuthTypes{"jwt": false, "password": false, "cert": false} } - if isFlagPassed("client_auth") { - log.V(1).Infof("client_auth provided") - }else { - log.V(1).Infof("client_auth not provided, using defaults.") - userAuth = defUserAuth - } + if isFlagPassed("client_auth") { + log.V(1).Infof("client_auth provided") + }else { + log.V(1).Infof("client_auth not provided, using defaults.") + userAuth = defUserAuth + } switch { case *port <= 0: @@ -61,12 +61,12 @@ func main() { cfg.Port = int64(*port) var opts []grpc.ServerOption - cfg.LogLevel, _ = strconv.Atoi(getflag("v")) - log.Errorf("flag: log level %v", cfg.LogLevel) - if cfg.LogLevel == 0 { - log.Errorf("resetting log level 0 to 7") - cfg.LogLevel = 7 - } + cfg.LogLevel, _ = strconv.Atoi(getflag("v")) + log.Errorf("flag: log level %v", cfg.LogLevel) + if cfg.LogLevel == 0 { + log.Errorf("resetting log level 0 to 7") + cfg.LogLevel = 7 + } if !*noTLS { var certificate tls.Certificate @@ -77,13 +77,13 @@ func main() { log.Exitf("could not load server key pair: %s", err) } } else { - switch { - case *serverCert == "": - log.Errorf("serverCert must be set.") - return - case *serverKey == "": - log.Errorf("serverKey must be set.") - return + switch { + case *serverCert == "": + log.Errorf("serverCert must be set.") + return + case *serverKey == "": + log.Errorf("serverKey must be set.") + return } certificate, err = tls.LoadX509KeyPair(*serverCert, *serverKey) if err != nil { @@ -152,21 +152,21 @@ func main() { } func isFlagPassed(name string) bool { - found := false - flag.Visit(func(f *flag.Flag) { - if f.Name == name { - found = true - } - }) - return found + found := false + flag.Visit(func(f *flag.Flag) { + if f.Name == name { + found = true + } + }) + return found } func getflag(name string) string { - val := "" - flag.VisitAll(func(f *flag.Flag) { - if f.Name == name { - val = f.Value.String() - } - }) - return val + val := "" + flag.VisitAll(func(f *flag.Flag) { + if f.Name == name { + val = f.Value.String() + } + }) + return val } From a4eb82fdc4539b4b8038baa0d98d83810fa7e45d Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Wed, 13 Jul 2022 20:54:47 +0000 Subject: [PATCH 09/10] per review comments --- gnmi_server/client_subscribe.go | 14 +++++----- sonic_data_client/events_client.go | 42 ++++++++++++++---------------- telemetry/telemetry.go | 13 ++++----- 3 files changed, 33 insertions(+), 36 deletions(-) diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index 82ade83bb..7b1b1615f 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -49,13 +49,13 @@ func NewClient(addr net.Addr) *Client { } func (c *Client) setLogLevel(lvl int) { - if (lvl >= 0) { - if lvl < logLevelMax { - c.logLevel = lvl - } else { - c.logLevel = logLevelMax - } - } + if (lvl >= 0) { + if lvl < logLevelMax { + c.logLevel = lvl + } else { + c.logLevel = logLevelMax + } + } } // String returns the target the client is querying. diff --git a/sonic_data_client/events_client.go b/sonic_data_client/events_client.go index c2db0342d..a2c7f870b 100644 --- a/sonic_data_client/events_client.go +++ b/sonic_data_client/events_client.go @@ -29,7 +29,7 @@ type EventClient struct { q *queue.PriorityQueue channel chan struct{} - w *sync.WaitGroup // wait for all sub go routines to finish + wg *sync.WaitGroup // wait for all sub go routines to finish subs_handle unsafe.Pointer @@ -48,8 +48,6 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl // Only one path is expected. Take the last if many evtc.path = path } - log.Errorf("NewEventClient constructed. logLevel=%d", logLevel) - C.swssSetLogPriority(C.int(logLevel)) /* Init subscriber with 2 seconds time out */ @@ -57,13 +55,15 @@ func NewEventClient(paths []*gnmipb.Path, prefix *gnmipb.Path, logLevel int) (Cl subs_data["recv_timeout"] = SUBSCRIBER_TIMEOUT j, err := json.Marshal(subs_data) if err != nil { - log.Errorf("events_init_subscriber: Failed to marshal") + log.V(3).Infof("events_init_subscriber: Failed to marshal") return nil, err } js := string(j) evtc.subs_handle = C.events_init_subscriber_wrap(C.CString(js)) - log.Errorf("events_init_subscriber: h=%v", evtc.subs_handle) evtc.stopped = 0 + + log.V(7).Infof("NewEventClient constructed. logLevel=%d", logLevel) + return &evtc, nil } @@ -85,9 +85,8 @@ func get_events(evtc *EventClient, updateChannel chan string) { c_mptr := (*C.char)(unsafe.Pointer(missed_ptr)) for { - log.V(7).("Call C.event_receive_wrap") rc := C.event_receive_wrap(evtc.subs_handle, c_eptr, EVENT_BUFFSZ, c_mptr, MISSED_BUFFSZ) - log.V(7).("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) + log.V(7).Infof("C.event_receive_wrap rc=%d evt:%s", rc, (*C.char)(evt_ptr)) if rc != 0 { updateChannel <- C.GoString((*C.char)(evt_ptr)) @@ -112,17 +111,16 @@ func send_event(evtc *EventClient, tv *gnmipb.TypedValue) error { Val: tv, } - log.V(7).("Sending spbv") - log.V(7).("spbv: %v", *spbv) + log.V(7).Infof("Sending spbv") if err := evtc.q.Put(Value{spbv}); err != nil { - log.Errorf("Queue error: %v", err) + log.V(3).Infof("Queue error: %v", err) return err } - log.V(7).("send_event done") + log.V(7).Infof("send_event done") return nil } -func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { +func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { hbData := make(map[string]interface{}) hbData["heart"] = "beat" hbVal, _ := json.Marshal(hbData) @@ -133,8 +131,8 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w }} - evtc.w = w - defer evtc.w.Done() + evtc.wg = wg + defer evtc.wg.Done() evtc.q = q evtc.channel = stop @@ -145,7 +143,7 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w for { select { case nextEvent := <-updateChannel: - log.V(7).("update received: %v", nextEvent) + log.V(7).Infof("update received: %v", nextEvent) evtTv := &gnmipb.TypedValue { Value: &gnmipb.TypedValue_StringVal { StringVal: nextEvent, @@ -155,31 +153,29 @@ func (evtc *EventClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w } case <-IntervalTicker(time.Second * HEARTBEAT_TIMEOUT): - log.V(7).("Ticker received") + log.V(7).Infof("Ticker received") if err := send_event(evtc, hbTv); err != nil { return } case <-evtc.channel: evtc.stopped = 1 - log.Errorf("Channel closed by client") + log.V(3).Infof("Channel closed by client") return } } - log.Errorf("Event exiting streamrun") + log.V(3).Infof("Event exiting streamrun") } -// TODO: Log data related to this session - -func (evtc *EventClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { +func (evtc *EventClient) Get(wg *sync.WaitGroup) ([]*spb.Value, error) { return nil, nil } -func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { +func (evtc *EventClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { return } -func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { +func (evtc *EventClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, wg *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { return } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 8f363ed28..856533385 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -61,12 +61,13 @@ func main() { cfg.Port = int64(*port) var opts []grpc.ServerOption - cfg.LogLevel, _ = strconv.Atoi(getflag("v")) - log.Errorf("flag: log level %v", cfg.LogLevel) - if cfg.LogLevel == 0 { - log.Errorf("resetting log level 0 to 7") - cfg.LogLevel = 7 - } + if val, err := strconv.Atoi(getflag("v")); err == nil { + cfg.LogLevel = val + log.Errorf("flag: log level %v", cfg.LogLevel) + } + else { + cfg.LogLevel = 3 + } if !*noTLS { var certificate tls.Certificate From 946b840d535095dde58876cd6314023b5ced582b Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Wed, 13 Jul 2022 21:28:45 +0000 Subject: [PATCH 10/10] tab & syntax correction --- telemetry/telemetry.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 856533385..0d403b6da 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -59,15 +59,13 @@ func main() { cfg := &gnmi.Config{} cfg.Port = int64(*port) + cfg.LogLevel = 3 var opts []grpc.ServerOption - if val, err := strconv.Atoi(getflag("v")); err == nil { - cfg.LogLevel = val - log.Errorf("flag: log level %v", cfg.LogLevel) - } - else { - cfg.LogLevel = 3 - } + if val, err := strconv.Atoi(getflag("v")); err == nil { + cfg.LogLevel = val + log.Errorf("flag: log level %v", cfg.LogLevel) + } if !*noTLS { var certificate tls.Certificate