diff --git a/README.md b/README.md index 887fb5ed5..416f30b1b 100644 --- a/README.md +++ b/README.md @@ -27,13 +27,6 @@ There is also a test program dialout_server_cli for collecting data from dial-ou The binaries will be installed under $GOPATH/bin/, they may be copied to any SONiC switch and run there. -You can also build a debian package and install it: - - git clone https://github.com/Azure/sonic-telemetry.git - pushd sonic-telemetry - dpkg-buildpackage -rfakeroot -b -us -uc - popd - ### Running * See [SONiC gRPC telemetry](./doc/grpc_telemetry.md) for how to run dial-in mode system telemetry server * See [SONiC telemetry in dial-out mode](./doc/dialout.md) for how to run dial-out mode system telemetry client diff --git a/dialout/dialout_client/dialout_client.go b/dialout/dialout_client/dialout_client.go index f530a0f21..5ab5964d7 100644 --- a/dialout/dialout_client/dialout_client.go +++ b/dialout/dialout_client/dialout_client.go @@ -1,11 +1,14 @@ package telemetry_dialout import ( - // "encoding/json" "crypto/tls" "errors" "fmt" + spb "github.com/Azure/sonic-telemetry/proto" + + "net" + sdc "github.com/Azure/sonic-telemetry/sonic_data_client" "github.com/go-redis/redis" log "github.com/golang/glog" @@ -15,8 +18,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "net" - //"reflect" + "strconv" "strings" "sync" diff --git a/dialout/dialout_client/dialout_client_test.go b/dialout/dialout_client/dialout_client_test.go index b94963be0..580c9d672 100644 --- a/dialout/dialout_client/dialout_client_test.go +++ b/dialout/dialout_client/dialout_client_test.go @@ -6,20 +6,16 @@ package telemetry_dialout import ( "crypto/tls" "encoding/json" + "github.com/go-redis/redis" - //"github.com/golang/protobuf/proto" + testcert "github.com/Azure/sonic-telemetry/testdata/tls" - //"github.com/kylelemons/godebug/pretty" - //"github.com/openconfig/gnmi/client" pb "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/gnmi/value" "golang.org/x/net/context" "google.golang.org/grpc" - //"google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - //"google.golang.org/grpc/status" - //"fmt" + "io/ioutil" "os" "os/exec" @@ -28,6 +24,8 @@ import ( "testing" "time" + "google.golang.org/grpc/credentials" + sds "github.com/Azure/sonic-telemetry/dialout/dialout_server" spb "github.com/Azure/sonic-telemetry/proto" sdc "github.com/Azure/sonic-telemetry/sonic_data_client" diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index b814a7d5b..847193610 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -11,8 +11,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - //spb "github.com/Azure/sonic-telemetry/proto" sdc "github.com/Azure/sonic-telemetry/sonic_data_client" + vdc "github.com/Azure/sonic-telemetry/virtual_database_client" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -119,6 +120,8 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { var dc sdc.Client if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) + } else if target == "SONIC_DB" { + dc, err = vdc.NewDbClient(paths, prefix) } else { dc, err = sdc.NewDbClient(paths, prefix) } @@ -226,6 +229,11 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error { c.errors++ return err } + case vdc.Value: + if resp, err = vdc.ValToResp(v); err != nil { + c.errors++ + return err + } default: log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c) c.errors++ diff --git a/gnmi_server/server.go b/gnmi_server/server.go index a52353dea..dbea0a7df 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -16,6 +16,8 @@ import ( "google.golang.org/grpc/status" sdc "github.com/Azure/sonic-telemetry/sonic_data_client" + vdc "github.com/Azure/sonic-telemetry/virtual_database_client" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) @@ -77,6 +79,11 @@ func (srv *Server) Serve() error { return srv.s.Serve(srv.lis) } +func (srv *Server) Stop() { + s := srv.s + s.Stop() +} + // Address returns the port the Server is listening to. func (srv *Server) Address() string { addr := srv.lis.Addr().String() @@ -174,6 +181,8 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe var dc sdc.Client if target == "OTHERS" { dc, err = sdc.NewNonDbClient(paths, prefix) + } else if target == "SONIC_DB" { + dc, err = vdc.NewDbClient(paths, prefix) } else { dc, err = sdc.NewDbClient(paths, prefix) } @@ -186,6 +195,10 @@ func (s *Server) Get(ctx context.Context, req *gnmipb.GetRequest) (*gnmipb.GetRe return nil, status.Error(codes.NotFound, err.Error()) } + if target == "SONIC_DB" { + notifications = make([]*gnmipb.Notification, len(spbValues)) + } + for index, spbValue := range spbValues { update := &gnmipb.Update{ Path: spbValue.GetPath(), diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index 262d12017..f90602e8c 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -6,10 +6,18 @@ package gnmi import ( "crypto/tls" "encoding/json" + testcert "github.com/Azure/sonic-telemetry/testdata/tls" "github.com/go-redis/redis" "github.com/golang/protobuf/proto" + "io/ioutil" + "os" + "os/exec" + "reflect" + "testing" + "time" + "github.com/kylelemons/godebug/pretty" "github.com/openconfig/gnmi/client" pb "github.com/openconfig/gnmi/proto/gnmi" @@ -19,15 +27,11 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" - "io/ioutil" - "os" - "os/exec" - "reflect" - "testing" - "time" + // Register supported client types. spb "github.com/Azure/sonic-telemetry/proto" sdc "github.com/Azure/sonic-telemetry/sonic_data_client" + gclient "github.com/jipanyang/gnmi/client/gnmi" ) diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index f2b93a5b0..e39af5c23 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -93,6 +93,7 @@ func (val Value) Compare(other queue.Item) int { type DbClient struct { prefix *gnmipb.Path pathG2S map[*gnmipb.Path][]tablePath + q *queue.PriorityQueue channel chan struct{} @@ -112,6 +113,14 @@ func NewDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { if UseRedisLocalTcpPort { useRedisTcpClient() } + + // TODO: Remove debug log + //for _, _path := range paths { + // fmt.Printf("single path: %v\n", _path) + //} + // + //fmt.Printf("prefix: %v\n", prefix) + if prefix.GetTarget() == "COUNTERS_DB" { err = initCountersPortNameMap() if err != nil { @@ -145,7 +154,7 @@ func NewDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { // String returns the target the client is querying. func (c *DbClient) String() string { // TODO: print gnmiPaths of this DbClient - return fmt.Sprintf("DbClient Prefix %v sendMsg %v, recvMsg %v", + return fmt.Sprintf("DbClient Prefix %v sendMsg %v, recvMsg %v", c.prefix.GetTarget(), c.sendMsg, c.recvMsg) } @@ -243,6 +252,7 @@ func (c *DbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { ts := time.Now() for gnmiPath, tblPaths := range c.pathG2S { val, err := tableData2TypedValue(tblPaths, nil) + //log.V(5).Infof("Val: %v\n", val) if err != nil { return nil, err } @@ -496,6 +506,7 @@ func makeJSON_redis(msi *map[string]interface{}, key *string, op *string, mfv ma for f, v := range mfv { (*msi)[f] = v } + return nil } @@ -515,12 +526,12 @@ func makeJSON_redis(msi *map[string]interface{}, key *string, op *string, mfv ma of[*op] = fp (*msi)[*key] = of } + return nil } // emitJSON marshalls map[string]interface{} to JSON byte stream. func emitJSON(v *map[string]interface{}) ([]byte, error) { - //j, err := json.MarshalIndent(*v, "", indentString) j, err := json.Marshal(*v) if err != nil { return nil, fmt.Errorf("JSON marshalling error: %v", err) @@ -589,6 +600,7 @@ func tableData2Msi(tblPath *tablePath, useKey bool, op *string, msi *map[string] // Split dbkey string into two parts and second part is key in table keys := strings.SplitN(dbkey, tblPath.delimitor, 2) key = keys[1] + err = makeJSON_redis(msi, &key, op, fv) } if err != nil { @@ -597,6 +609,7 @@ func tableData2Msi(tblPath *tablePath, useKey bool, op *string, msi *map[string] } log.V(6).Infof("Added idex %v fv %v ", idx, fv) } + return nil } @@ -644,6 +657,9 @@ func tableData2TypedValue(tblPaths []tablePath, op *string) (*gnmipb.TypedValue, } } + // Debug logging + log.V(5).Infof("tblPath: %v\n", tblPath) + err := tableData2Msi(&tblPath, useKey, nil, &msi) if err != nil { return nil, err diff --git a/sonic_data_client/virtual_db.go b/sonic_data_client/virtual_db.go index 7b1592cf7..6136b8d19 100644 --- a/sonic_data_client/virtual_db.go +++ b/sonic_data_client/virtual_db.go @@ -2,8 +2,9 @@ package client import ( "fmt" - log "github.com/golang/glog" "strings" + + log "github.com/golang/glog" ) // virtual db is to Handle diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index b10430457..01394b3fa 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/credentials" gnmi "github.com/Azure/sonic-telemetry/gnmi_server" + testcert "github.com/Azure/sonic-telemetry/testdata/tls" ) diff --git a/testdata/Interfaces_Port_name_Ethernet68_1_BaseCounter.txt b/testdata/Interfaces_Port_name_Ethernet68_1_BaseCounter.txt new file mode 100644 index 000000000..7c0f87b6d --- /dev/null +++ b/testdata/Interfaces_Port_name_Ethernet68_1_BaseCounter.txt @@ -0,0 +1,84 @@ +{ + "SAI_PORT_STAT_ETHER_IN_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_64_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_64_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_RX_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_COLLISIONS": "0", + "SAI_PORT_STAT_ETHER_STATS_CRC_ALIGN_ERRORS": "0", + "SAI_PORT_STAT_ETHER_STATS_DROP_EVENTS": "0", + "SAI_PORT_STAT_ETHER_STATS_FRAGMENTS": "0", + "SAI_PORT_STAT_ETHER_STATS_JABBERS": "0", + "SAI_PORT_STAT_ETHER_STATS_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_64_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_RX_NO_ERRORS": "0", + "SAI_PORT_STAT_ETHER_STATS_TX_NO_ERRORS": "0", + "SAI_PORT_STAT_ETHER_STATS_UNDERSIZE_PKTS": "0", + "SAI_PORT_STAT_ETHER_TX_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_IF_IN_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_DISCARDS": "0", + "SAI_PORT_STAT_IF_IN_ERRORS": "0", + "SAI_PORT_STAT_IF_IN_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_OCTETS": "0", + "SAI_PORT_STAT_IF_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_UNKNOWN_PROTOS": "0", + "SAI_PORT_STAT_IF_IN_VLAN_DISCARDS": "0", + "SAI_PORT_STAT_IF_OUT_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_IF_OUT_DISCARDS": "0", + "SAI_PORT_STAT_IF_OUT_ERRORS": "0", + "SAI_PORT_STAT_IF_OUT_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_IF_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IF_OUT_OCTETS": "0", + "SAI_PORT_STAT_IF_OUT_QLEN": "0", + "SAI_PORT_STAT_IF_OUT_UCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_IN_DISCARDS": "0", + "SAI_PORT_STAT_IPV6_IN_MCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_IN_OCTETS": "0", + "SAI_PORT_STAT_IPV6_IN_RECEIVES": "0", + "SAI_PORT_STAT_IPV6_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_OUT_DISCARDS": "0", + "SAI_PORT_STAT_IPV6_OUT_MCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_OUT_OCTETS": "0", + "SAI_PORT_STAT_IPV6_OUT_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_IN_DISCARDS": "0", + "SAI_PORT_STAT_IP_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_IN_OCTETS": "0", + "SAI_PORT_STAT_IP_IN_RECEIVES": "0", + "SAI_PORT_STAT_IP_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_OUT_DISCARDS": "0", + "SAI_PORT_STAT_IP_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_OUT_OCTETS": "0", + "SAI_PORT_STAT_IP_OUT_UCAST_PKTS": "0" +} diff --git a/testdata/Interfaces_Port_name_Ethernet68_1_PfcCounter.txt b/testdata/Interfaces_Port_name_Ethernet68_1_PfcCounter.txt new file mode 100644 index 000000000..93ac64469 --- /dev/null +++ b/testdata/Interfaces_Port_name_Ethernet68_1_PfcCounter.txt @@ -0,0 +1,18 @@ +{ + "SAI_PORT_STAT_PFC_0_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_0_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_1_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_1_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_2_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_2_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_3_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_3_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_4_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_4_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_5_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_5_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_6_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_6_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_7_RX_PKTS": "2", + "SAI_PORT_STAT_PFC_7_TX_PKTS": "0" +} diff --git a/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_Pfcwd.txt b/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_Pfcwd.txt new file mode 100644 index 000000000..f1fcd47f7 --- /dev/null +++ b/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_Pfcwd.txt @@ -0,0 +1,6 @@ +{ + "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED": "0", + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED": "0", + "PFC_WD_STATUS": "operational", + "SAI_PORT_STAT_PFC_4_RX_PKTS": "0" +} diff --git a/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_QueueCounter.txt b/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_QueueCounter.txt new file mode 100644 index 000000000..630a0a914 --- /dev/null +++ b/testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_QueueCounter.txt @@ -0,0 +1,6 @@ +{ + "SAI_QUEUE_STAT_BYTES": "0", + "SAI_QUEUE_STAT_DROPPED_BYTES": "0", + "SAI_QUEUE_STAT_DROPPED_PACKETS": "0", + "SAI_QUEUE_STAT_PACKETS": "0" +} diff --git a/virtual_database_client/db_client.go b/virtual_database_client/db_client.go new file mode 100644 index 000000000..47315cfdd --- /dev/null +++ b/virtual_database_client/db_client.go @@ -0,0 +1,297 @@ +// Data client for new Virtual Path + +package client + +import ( + "fmt" + "sync" + "time" + + log "github.com/golang/glog" + + spb "github.com/Azure/sonic-telemetry/proto" + "github.com/go-redis/redis" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/workiva/go-datastructures/queue" +) + +const ( + // indentString represents the default indentation string used for JSON. + // Two spaces are used here. + indentString string = " " + Default_REDIS_UNIXSOCKET string = "/var/run/redis/redis.sock" + Default_REDIS_LOCAL_TCP_PORT string = "localhost:6379" +) + +// Client defines a set of methods which every client must implement. +// This package provides one implmentation for now: the DbClient +// +type Client interface { + // StreamRun will start watching service on data source + // and enqueue data change to the priority queue. + // It stops all activities upon receiving signal on stop channel + // It should run as a go routine + StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup) + // Poll will start service to respond poll signal received on poll channel. + // data read from data source will be enqueued on to the priority queue + // The service will stop upon detection of poll channel closing. + // It should run as a go routine + PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup) + // Get return data from the data source in format of *spb.Value + Get(w *sync.WaitGroup) ([]*spb.Value, error) + // Close provides implemenation for explicit cleanup of Client + Close() error +} + +var ( + // Let it be variable visible to other packages for now. + // May add an interface function for it. + UseRedisLocalTcpPort bool = false + + // Redis client connected to each DB + Target2RedisDb = make(map[string]*redis.Client) +) + +type tablePath struct { + dbName string + keyName string + delimitor string + fields []string // fields listed in list are returned + patterns []string // fields matched with patterns are returned +} + +type Value struct { + *spb.Value +} + +// Implement Compare method for priority queue +func (val Value) Compare(other queue.Item) int { + oval := other.(Value) + if val.GetTimestamp() > oval.GetTimestamp() { + return 1 + } else if val.GetTimestamp() == oval.GetTimestamp() { + return 0 + } + return -1 +} + +type DbClient struct { + prefix *gnmipb.Path + // Used by Get server + paths []*gnmipb.Path + //pathG2S map[*gnmipb.Path][]tablePath + + q *queue.PriorityQueue + channel chan struct{} + + synced sync.WaitGroup // Control when to send gNMI sync_response + w *sync.WaitGroup // wait for all sub go routines to finish + mu sync.RWMutex // Mutex for data protection among routines for DbClient + + sendMsg int64 + recvMsg int64 + errors int64 +} + +func NewDbClient(paths []*gnmipb.Path, prefix *gnmipb.Path) (Client, error) { + var client DbClient + var err error + // Testing program may ask to use redis local tcp connection + if UseRedisLocalTcpPort { + useRedisTcpClient() + } + + err = initCountersPortNameMap() + if err != nil { + return nil, err + } + err = initCountersQueueNameMap() + if err != nil { + return nil, err + } + err = initAliasMap() + if err != nil { + return nil, err + } + err = initCountersPfcwdNameMap() + if err != nil { + return nil, err + } + + client.prefix = prefix + client.paths = paths + return &client, nil +} + +func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync.WaitGroup) { + c.w = w + defer c.w.Done() + c.q = q + c.channel = stop + + pathG2S := make(map[*gnmipb.Path][]tablePath) + err := populateAlltablePaths(c.prefix, c.paths, &pathG2S) + if err != nil { + enqueFatalMsg(c, err.Error()) + return + } + + if len(pathG2S) == 0 { + enqueFatalMsg(c, fmt.Sprintf("Prefix:%v, path: %v not valid paths", c.prefix, c.paths)) + return + } + + // Assume all ON_CHANGE mode + for gnmiPath, tblPaths := range pathG2S { + c.w.Add(1) + c.synced.Add(1) + go dbPathSubscribe(gnmiPath, tblPaths, c) + } + + // Wait until all data values corresponding to the paths specified + // in the SubscriptionList has been transmitted at least once + c.synced.Wait() + // Inject sync message + c.q.Put(Value{ + &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + }, + }) + log.V(2).Infof("%v Synced", pathG2S) + return +} + +func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup) { + return +} + +func (c *DbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { + // wait sync for Get, not used for now + c.w = w + + pathG2S := make(map[*gnmipb.Path][]tablePath) + err := populateAlltablePaths(c.prefix, c.paths, &pathG2S) + if err != nil { + return nil, err + } + + if len(pathG2S) == 0 { + return nil, fmt.Errorf("Failed to map to real db paths. Prefix: %v, paths: %v not valid paths", c.prefix, c.paths) + } + + var values []*spb.Value + ts := time.Now() + for gnmiPath, tblPaths := range pathG2S { + val, err := tableData2TypedValue(tblPaths) + if err != nil { + return nil, err + } + + values = append(values, &spb.Value{ + Prefix: c.prefix, + Path: gnmiPath, + Timestamp: ts.UnixNano(), + Val: val, + }) + } + log.V(5).Infof("Getting #%v", values) + log.V(4).Infof("Get done, total time taken: %v ms", int64(time.Since(ts)/time.Millisecond)) + return values, nil +} + +// TODO: Log data related to this session +func (c *DbClient) Close() error { + return nil +} + +func GetTableKeySeparator(target string) (string, error) { + _, ok := spb.Target_value[target] + if !ok { + log.V(1).Infof(" %v not a valid path target", target) + return "", fmt.Errorf("%v not a valid path target", target) + } + + var separator string + switch target { + case "CONFIG_DB": + separator = "|" + case "STATE_DB": + separator = "|" + default: + separator = ":" + } + return separator, nil +} + +// For testing only +func useRedisTcpClient() { + for dbName, dbn := range spb.Target_value { + if dbName != "OTHERS" { + // DB connector for direct redis operation + var redisDb *redis.Client + if UseRedisLocalTcpPort { + redisDb = redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: Default_REDIS_LOCAL_TCP_PORT, + Password: "", // no password set + DB: int(dbn), + DialTimeout: 0, + }) + } + Target2RedisDb[dbName] = redisDb + } + } +} + +// Client package prepare redis clients to all DBs automatically +func init() { + for dbName, dbn := range spb.Target_value { + if dbName != "OTHERS" { + // DB connector for direct redis operation + var redisDb *redis.Client + + redisDb = redis.NewClient(&redis.Options{ + Network: "unix", + Addr: Default_REDIS_UNIXSOCKET, + Password: "", // no password set + DB: int(dbn), + DialTimeout: 0, + }) + Target2RedisDb[dbName] = redisDb + } + } +} + +// Convert from SONiC Value to its corresponding gNMI proto stream +// response type +func ValToResp(val Value) (*gnmipb.SubscribeResponse, error) { + switch val.GetSyncResponse() { + case true: + return &gnmipb.SubscribeResponse{ + Response: &gnmipb.SubscribeResponse_SyncResponse{ + SyncResponse: true, + }, + }, nil + default: + // In case the subscribe/poll routines encountered fatal error + if fatal := val.GetFatal(); fatal != "" { + return nil, fmt.Errorf("%s", fatal) + } + + return &gnmipb.SubscribeResponse{ + Response: &gnmipb.SubscribeResponse_Update{ + Update: &gnmipb.Notification{ + Timestamp: val.GetTimestamp(), + Prefix: val.GetPrefix(), + Update: []*gnmipb.Update{ + { + Path: val.GetPath(), + Val: val.GetVal(), + }, + }, + }, + }, + }, nil + } +} diff --git a/virtual_database_client/db_client_test.go b/virtual_database_client/db_client_test.go new file mode 100644 index 000000000..3d3ff4ec3 --- /dev/null +++ b/virtual_database_client/db_client_test.go @@ -0,0 +1,815 @@ +package client_test + +// Prerequisite: redis-server should be running. + +import ( + "context" + tls "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/exec" + "reflect" + "testing" + "time" + + vdc "github.com/Azure/sonic-telemetry/virtual_database_client" + "github.com/kylelemons/godebug/pretty" + + gnmi "github.com/Azure/sonic-telemetry/gnmi_server" + + xpath "github.com/jipanyang/gnxi/utils/xpath" + "github.com/openconfig/gnmi/client" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" + value "github.com/openconfig/gnmi/value" + + spb "github.com/Azure/sonic-telemetry/proto" + testcert "github.com/Azure/sonic-telemetry/testdata/tls" + redis "github.com/go-redis/redis" + "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + status "google.golang.org/grpc/status" + + gclient "github.com/jipanyang/gnmi/client/gnmi" +) + +var clientTypes = []string{gclient.Type} + +func getRedisClient(t *testing.T, dbName string) *redis.Client { + dbn := spb.Target_value[dbName] + rclient := redis.NewClient(&redis.Options{ + Network: "tcp", + Addr: "localhost:6379", + Password: "", // no password set + DB: int(dbn), + DialTimeout: 0, + }) + _, err := rclient.Ping().Result() + if err != nil { + t.Fatal("failed to connect to redis server ", err) + } + return rclient +} + +func setTestDataToRedisDB(t *testing.T, rclient *redis.Client, mpi map[string]interface{}) { + for key, fv := range mpi { + switch fv.(type) { + case map[string]interface{}: + _, err := rclient.HMSet(key, fv.(map[string]interface{})).Result() + if err != nil { + t.Errorf("Invalid data for db: %v : %v %v", key, fv, err) + } + default: + t.Errorf("Invalid data for db: %v : %v", key, fv) + } + } +} + +func parseTestData(t *testing.T, key string, in []byte) map[string]interface{} { + var fvp map[string]interface{} + + err := json.Unmarshal(in, &fvp) + if err != nil { + t.Errorf("Failed to Unmarshal %v err: %v", in, err) + } + if key != "" { + kv := map[string]interface{}{} + kv[key] = fvp + return kv + } + return fvp +} + +func loadTestDataIntoRedis(t *testing.T, redisClient *redis.Client, dbKey string, testDataPath string) { + data, err := ioutil.ReadFile(testDataPath) + if err != nil { + t.Fatalf("read file %v err: %v", testDataPath, err) + } + data_kv_map := parseTestData(t, dbKey, data) + setTestDataToRedisDB(t, redisClient, data_kv_map) +} + +func prepareConfigDB(t *testing.T) { + configDB := getRedisClient(t, "CONFIG_DB") + defer configDB.Close() + configDB.FlushDB() + + loadTestDataIntoRedis(t, configDB, "", "../testdata/COUNTERS_PORT_ALIAS_MAP.txt") + loadTestDataIntoRedis(t, configDB, "", "../testdata/CONFIG_PFCWD_PORTS.txt") +} + +func creategNMIServer(t *testing.T) *gnmi.Server { + certificate, err := testcert.NewCert() + if err != nil { + t.Errorf("could not load server key pair: %s", err) + } + tlsCfg := &tls.Config{ + ClientAuth: tls.RequestClientCert, + Certificates: []tls.Certificate{certificate}, + } + + opts := []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))} + cfg := &gnmi.Config{Port: 8080} + s, err := gnmi.NewServer(cfg, opts) + if err != nil { + t.Errorf("Failed to create gNMI server: %v", err) + } + + return s +} + +func rungNMIServer(t *testing.T, s *gnmi.Server) { + //t.Log("Starting RPC server on address:", s.Address()) + err := s.Serve() // blocks until close + if err != nil { + t.Fatalf("gRPC server err: %v", err) + } + //t.Log("Exiting RPC server on address", s.Address()) +} + +func sendGetRequest(t *testing.T, ctx context.Context, gnmiClient gnmipb.GNMIClient, xPath string, + pathToTargetDB string, expectedReturnCode codes.Code) *gnmipb.GetResponse { + // Issue Get RPC. + pbPath, err := xpath.ToGNMIPath(xPath) + if err != nil { + t.Fatalf("error in parsing xpath %q to gnmi path", xPath) + } + + prefix := gnmipb.Path{Target: pathToTargetDB} + request := &gnmipb.GetRequest{ + Prefix: &prefix, + Path: []*gnmipb.Path{pbPath}, + Encoding: gnmipb.Encoding_JSON_IETF, + } + + response, err := gnmiClient.Get(ctx, request) + + // Check return value and gRPC status code. + returnStatus, ok := status.FromError(err) + if !ok { + t.Fatal("got a non-grpc error from grpc call") + } + if returnStatus.Code() != expectedReturnCode { + t.Log("err: ", err) + t.Fatalf("got return code %v, expected %v", returnStatus.Code(), expectedReturnCode) + } + + return response +} + +var expectedValueEthernet68 = map[string]interface{}{ + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED": "0", + "PFC_WD_STATUS": "operational", + "SAI_PORT_STAT_PFC_4_RX_PKTS": "0", + "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED": "0", + "SAI_PORT_STAT_ETHER_STATS_RX_NO_ERRORS": "0", + "SAI_PORT_STAT_IF_OUT_OCTETS": "0", + "SAI_PORT_STAT_IPV6_IN_RECEIVES": "0", + "SAI_PORT_STAT_IPV6_OUT_MCAST_PKTS": "0", + "SAI_PORT_STAT_IP_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_UNDERSIZE_PKTS": "0", + "SAI_PORT_STAT_IF_IN_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_IF_OUT_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_64_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_RX_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_JABBERS": "0", + "SAI_PORT_STAT_IPV6_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_IN_DISCARDS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_IF_IN_VLAN_DISCARDS": "0", + "SAI_PORT_STAT_IP_OUT_OCTETS": "0", + "SAI_PORT_STAT_IF_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_IF_IN_OCTETS": "0", + "SAI_PORT_STAT_IF_OUT_ERRORS": "0", + "SAI_PORT_STAT_IP_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_IPV6_OUT_DISCARDS": "0", + "SAI_PORT_STAT_IPV6_OUT_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_64_OCTETS": "0", + "SAI_PORT_STAT_IF_OUT_MULTICAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_65_TO_127_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_9217_TO_16383_OCTETS": "0", + "SAI_PORT_STAT_ETHER_TX_OVERSIZE_PKTS": "0", + "SAI_PORT_STAT_IF_OUT_DISCARDS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_IPV6_IN_DISCARDS": "0", + "SAI_PORT_STAT_IPV6_OUT_OCTETS": "0", + "SAI_PORT_STAT_IP_IN_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_IPV6_IN_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_COLLISIONS": "0", + "SAI_PORT_STAT_IF_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_OUT_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_OUT_UCAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_1024_TO_1518_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_128_TO_255_OCTETS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_512_TO_1023_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_CRC_ALIGN_ERRORS": "0", + "SAI_PORT_STAT_ETHER_STATS_DROP_EVENTS": "0", + "SAI_PORT_STAT_ETHER_STATS_PKTS_2048_TO_4095_OCTETS": "0", + "SAI_PORT_STAT_ETHER_IN_PKTS_256_TO_511_OCTETS": "0", + "SAI_PORT_STAT_IPV6_IN_MCAST_PKTS": "0", + "SAI_PORT_STAT_IP_IN_RECEIVES": "0", + "SAI_PORT_STAT_IF_IN_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_IF_IN_DISCARDS": "0", + "SAI_PORT_STAT_IF_IN_ERRORS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_4096_TO_9216_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_FRAGMENTS": "0", + "SAI_PORT_STAT_ETHER_STATS_TX_NO_ERRORS": "0", + "SAI_PORT_STAT_IF_OUT_BROADCAST_PKTS": "0", + "SAI_PORT_STAT_IPV6_IN_NON_UCAST_PKTS": "0", + "SAI_PORT_STAT_IP_OUT_DISCARDS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_1519_TO_2047_OCTETS": "0", + "SAI_PORT_STAT_ETHER_STATS_OCTETS": "0", + "SAI_PORT_STAT_IF_IN_UNKNOWN_PROTOS": "0", + "SAI_PORT_STAT_IF_OUT_QLEN": "0", + "SAI_PORT_STAT_IPV6_IN_UCAST_PKTS": "0", + "SAI_PORT_STAT_ETHER_OUT_PKTS_64_OCTETS": "0", + "SAI_QUEUE_STAT_DROPPED_PACKETS": "0", + "SAI_QUEUE_STAT_PACKETS": "0", + "SAI_QUEUE_STAT_BYTES": "0", + "SAI_QUEUE_STAT_DROPPED_BYTES": "0", + "SAI_PORT_STAT_PFC_3_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_7_RX_PKTS": "2", + "SAI_PORT_STAT_PFC_4_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_6_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_5_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_7_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_1_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_2_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_5_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_6_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_0_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_2_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_3_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_0_TX_PKTS": "0", + "SAI_PORT_STAT_PFC_1_RX_PKTS": "0", +} + +var expectedValueEthernet68Pfcwd = map[string]interface{}{ + "PFC_WD_STATUS": "operational", + "SAI_PORT_STAT_PFC_3_RX_PKTS": "0", + "SAI_PORT_STAT_PFC_4_RX_PKTS": "0", + "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED": "0", + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED": "0", +} + +func dumpMapFromResponse(response *gnmipb.GetResponse, name string) { + fmt.Printf("\n\n>>>>>\n\n") + notifs := response.GetNotification() + var gotVal interface{} + var gotMap = make(map[string]interface{}) + + count := len(notifs) + for i := 0; i < count; i++ { + val := notifs[i].GetUpdate()[0].GetVal() + json.Unmarshal(val.GetJsonIetfVal(), &gotVal) + m := gotVal.(map[string]interface{}) + for k, v := range m { + gotMap[k] = v + } + } + + fmt.Printf("var %s = map[string]interface{}{\n", name) + for k, v := range gotMap { + fmt.Printf("\t\"%v\": \"%v\",\n", k, v) + } + fmt.Printf("}\n") + fmt.Printf("\n\n>>>>>\n\n") +} + +func assertExpectedValueFromMap(t *testing.T, response *gnmipb.GetResponse, expected map[string]interface{}) { + notifs := response.GetNotification() + var gotVal interface{} + var gotMap = make(map[string]interface{}) + + // Load up all k, v pairs out of the JSON value into a go map. + count := len(notifs) + for i := 0; i < count; i++ { + val := notifs[i].GetUpdate()[0].GetVal() + json.Unmarshal(val.GetJsonIetfVal(), &gotVal) + m := gotVal.(map[string]interface{}) + for k, v := range m { + gotMap[k] = v + } + } + + // Assert matching k, v pairs from the `expected` map and data from the gnmi response. + if len(expected) != len(gotMap) { + t.Fatalf("Expected %v entries, got %v.", len(expected), len(gotMap)) + } + for k, v := range gotMap { + if val, ok := expected[k]; ok { + if val != v { + t.Fatalf("Expected key %v with value %v, but got the value %v instead.", k, val, v) + } + } else { + t.Fatalf("Received unexpected key %v from output.", k) + } + } +} + +func assertExpectedValue(t *testing.T, response *gnmipb.GetResponse, expectedResponseValue interface{}) { + var gotVal interface{} + if response != nil { + notifs := response.GetNotification() + if len(notifs) != 1 { + t.Fatalf("got %d notifications, want 1", len(notifs)) + } + updates := notifs[0].GetUpdate() + if len(updates) != 1 { + t.Fatalf("got %d updates in the notification, want 1", len(updates)) + } + val := updates[0].GetVal() + if val.GetJsonIetfVal() == nil { + gotVal, err := value.ToScalar(val) + if err != nil { + t.Errorf("got: %v, want a scalar value", gotVal) + } + } else { + // Unmarshal json data to gotVal container for comparison. + if err := json.Unmarshal(val.GetJsonIetfVal(), &gotVal); err != nil { + t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) + } + var expectedJSONStruct interface{} + if err := json.Unmarshal(expectedResponseValue.([]byte), &expectedJSONStruct); err != nil { + t.Fatalf("error in unmarshaling IETF JSON data to json container: %v", err) + } + expectedResponseValue = expectedJSONStruct + } + } + + if !reflect.DeepEqual(gotVal, expectedResponseValue) { + t.Errorf("got: %v (%T),\nwant %v (%T)", gotVal, gotVal, expectedResponseValue, expectedResponseValue) + } +} + +func loadExpectedResponseByteData(t *testing.T, path string) interface{} { + data, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("read file %v err: %v", path, err) + } + return data +} + +func TestVirtualDatabaseGNMIGet(t *testing.T) { + // Open COUNTERS_DB redis client. + countersDB := getRedisClient(t, "COUNTERS_DB") + defer countersDB.Close() + countersDB.FlushDB() + + // Enable keyspace notification. + os.Setenv("PATH", "/usr/bin:/sbin:/bin:/usr/local/bin") + cmd := exec.Command("redis-cli", "config", "set", "notify-keyspace-events", "KEA") + _, err := cmd.Output() + if err != nil { + t.Fatal("failed to enable redis keyspace notification ", err) + } + + // Load test data into COUNTERS_DB. + loadTestDataIntoRedis(t, countersDB, "COUNTERS_PORT_NAME_MAP", "../testdata/COUNTERS_PORT_NAME_MAP.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS_QUEUE_NAME_MAP", "../testdata/COUNTERS_QUEUE_NAME_MAP.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1000000000039", "../testdata/COUNTERS:Ethernet68.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1000000000003", "../testdata/COUNTERS:Ethernet1.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000092a", "../testdata/COUNTERS:oid:0x1500000000092a.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091c", "../testdata/COUNTERS:oid:0x1500000000091c.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091e", "../testdata/COUNTERS:oid:0x1500000000091e.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091f", "../testdata/COUNTERS:oid:0x1500000000091f.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091f", "../testdata/COUNTERS:oid:0x1500000000091f.txt") + + // Load CONFIG_DB, flush old data, and load in test data. + prepareConfigDB(t) + + // Start telementry service. + gnmiServer := creategNMIServer(t) + if gnmiServer == nil { + t.Fatalf("Unable to bind gNMI server to local port 8080.") + } + go rungNMIServer(t, gnmiServer) + defer gnmiServer.Stop() + + // Create a GNMI client used to invoke RPCs. + tlsConfig := &tls.Config{InsecureSkipVerify: true} + opts := []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} + + targetAddr := "127.0.0.1:8080" + conn, err := grpc.Dial(targetAddr, opts...) + if err != nil { + t.Fatalf("Dialing to %q failed: %v", targetAddr, err) + } + defer conn.Close() + + gnmiClient := gnmipb.NewGNMIClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Perform unit tests that closely resemble Jipan's original gnmi tests. + t.Run("Get Interfaces/Port[name=Ethernet70], a valid path (with no corresponding data in the db); expected NotFound", func(t *testing.T) { + expectedReturnCode := codes.NotFound + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet70]" + sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + }) + t.Run("Get Interfaces/Port[name=Ethernet400], invalid valid path; expected NotFound", func(t *testing.T) { + expectedReturnCode := codes.NotFound + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet400]" + sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + }) + t.Run("Get Interfaces/Port[name=Ethernet68]/..., Everything under Ethernet68", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68]/..." + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + assertExpectedValueFromMap(t, response, expectedValueEthernet68) + }) + t.Run("Get Interfaces/Port[name=Ethernet68]/PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS], valid path for specific leaf, but not implemented.", func(t *testing.T) { + expectedReturnCode := codes.NotFound + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68]/PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS]" + sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + }) + t.Run("Get Interfaces/Port[name=Ethernet68]/Queue[name=*]/Pfcwd", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68]/Queue[name=*]/Pfcwd" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + assertExpectedValueFromMap(t, response, expectedValueEthernet68Pfcwd) + }) + t.Run("Get Interfaces/Port[name=*]/PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS], valid path for specific leaf for all nodes, but not implemented.", func(t *testing.T) { + expectedReturnCode := codes.NotFound + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=*]/PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS]" + sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + }) + t.Run("Get Interfaces/.../Pfcwd, valid path for specific PFC-related leaf for all nodes, but not implemented.", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/.../Pfcwd" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + assertExpectedValueFromMap(t, response, expectedValueEthernet68Pfcwd) + }) + + // Perform some additional unit tests. + t.Run("Get Interfaces/Port[name=Ethernet68/1]/BaseCounter", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68/1]/BaseCounter" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + expectedResponseValue := loadExpectedResponseByteData(t, "../testdata/Interfaces_Port_name_Ethernet68_1_BaseCounter.txt") + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=Ethernet68]/BaseCounter (no slash /1 after Ethernet68)", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68]/BaseCounter" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + expectedResponseValue := loadExpectedResponseByteData(t, "../testdata/Interfaces_Port_name_Ethernet68_1_BaseCounter.txt") + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=Ethernet68/1]/PfcCounter", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68/1]/PfcCounter" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + expectedResponseValue := loadExpectedResponseByteData(t, "../testdata/Interfaces_Port_name_Ethernet68_1_PfcCounter.txt") + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=Ethernet68/1]/Queue[name=Queue4]/Pfcwd", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68/1]/Queue[name=Queue4]/Pfcwd" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + expectedResponseValue := loadExpectedResponseByteData(t, "../testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_Pfcwd.txt") + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=Ethernet68/1]/Queue[name=Queue4]/QueueCounter", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet68/1]/Queue[name=Queue4]/QueueCounter" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + expectedResponseValue := loadExpectedResponseByteData(t, "../testdata/Interfaces_Port_name_Ethernet68_1_Queue_name_Queue4_QueueCounter.txt") + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=Ethernet70], a valid path (with no corresponding data in the db); expected NotFound", func(t *testing.T) { + expectedReturnCode := codes.NotFound + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=Ethernet70]" + response := sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + var expectedResponseValue interface{} = nil + assertExpectedValue(t, response, expectedResponseValue) + }) + t.Run("Get Interfaces/Port[name=*]/..., Retrieve everything under all ports", func(t *testing.T) { + expectedReturnCode := codes.OK + pathToTargetDB := "SONIC_DB" + xpath := "Interfaces/Port[name=*]/..." + sendGetRequest(t, ctx, gnmiClient, xpath, pathToTargetDB, expectedReturnCode) + // Only checking return code, since returned data will: A) require a large text file; B) change + // whenever new kinds of test data is loaded into the DB (for example when modifying tests). + }) +} + +func flushDBAndLoadTestData(t *testing.T, countersDB *redis.Client) { + countersDB.FlushDB() + + loadTestDataIntoRedis(t, countersDB, "COUNTERS_PORT_NAME_MAP", "../testdata/COUNTERS_PORT_NAME_MAP.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS_QUEUE_NAME_MAP", "../testdata/COUNTERS_QUEUE_NAME_MAP.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1000000000039", "../testdata/COUNTERS:Ethernet68.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1000000000003", "../testdata/COUNTERS:Ethernet1.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000092a", "../testdata/COUNTERS:oid:0x1500000000092a.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091c", "../testdata/COUNTERS:oid:0x1500000000091c.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091e", "../testdata/COUNTERS:oid:0x1500000000091e.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091f", "../testdata/COUNTERS:oid:0x1500000000091f.txt") + loadTestDataIntoRedis(t, countersDB, "COUNTERS:oid:0x1500000000091f", "../testdata/COUNTERS:oid:0x1500000000091f.txt") + + prepareConfigDB(t) +} + +func loadTestDataAsJSON(t *testing.T, testDataPath string) interface{} { + data, err := ioutil.ReadFile(testDataPath) + if err != nil { + t.Fatalf("read file %v err: %v", testDataPath, err) + } + + var dataJSON interface{} + json.Unmarshal(data, &dataJSON) + + return dataJSON +} + +func TestVirtualDatabaseGNMISubscribe(t *testing.T) { + // Open COUNTERS_DB redis client. + countersDB := getRedisClient(t, "COUNTERS_DB") + defer countersDB.Close() + + // Enable keyspace notification. + os.Setenv("PATH", "/usr/bin:/sbin:/bin:/usr/local/bin") + cmd := exec.Command("redis-cli", "config", "set", "notify-keyspace-events", "KEA") + _, err := cmd.Output() + if err != nil { + t.Fatal("failed to enable redis keyspace notification ", err) + } + + // Start telementry service. + gnmiServer := creategNMIServer(t) + if gnmiServer == nil { + t.Fatalf("Unable to bind gNMI server to local port 8080.") + } + go rungNMIServer(t, gnmiServer) + defer gnmiServer.Stop() + + // Generic function for performing a subscription test. The flow is: + // 1. Flush redis db and load in all test data. + // 2. Construct a gNMI client to perform the subscription RPC. + // 3. Build the subscription query struct. + // 4. Perform database updates. + // 5. Assert expectations on collated notifications. + doSubscribeTest := func(t *testing.T, queryPaths []client.Path, expectedNotifications []client.Notification, updates func(), shouldSucceed bool) { + // 1. Flush redis db and load in all test data. + flushDBAndLoadTestData(t, countersDB) + + time.Sleep(time.Millisecond * 1000) + + // 2. Construct a gNMI client to perform the subscription RPC. + c := client.New() + defer c.Close() + + // 3. Build the subscription query struct. + var query client.Query + query.Addrs = []string{"127.0.0.1:8080"} + query.Target = "SONIC_DB" + query.Type = client.Stream + query.Queries = queryPaths + query.TLS = &tls.Config{InsecureSkipVerify: true} + + logNotifications := false + + // Collate notifications with handler. + var gotNotifications []client.Notification + query.NotificationHandler = func(notification client.Notification) error { + if logNotifications { + t.Logf("reflect.TypeOf(notification) %v : %v", reflect.TypeOf(notification), notification) + } + + if n, ok := notification.(client.Update); ok { + n.TS = time.Unix(0, 0) // Clear this to zero so expected notification formats are easier to assert. + gotNotifications = append(gotNotifications, n) + } else { + gotNotifications = append(gotNotifications, notification) + } + + return nil + } + + logSubscribeErr := false + go func() { + c.Subscribe(context.Background(), query) + if logSubscribeErr { + t.Logf("c.Subscribe err: %v\n", err) + } + }() + defer c.Close() + + // Wait for subscription to sync. + time.Sleep(time.Millisecond * 500) + + // 4. Perform database updates. + updates() + + // Wait for updates to propogate notifications. + time.Sleep(time.Millisecond * 1000) + + // 5. Assert expectations on collated notifications. + if diff := pretty.Compare(expectedNotifications, gotNotifications); shouldSucceed && diff != "" { + t.Log("\n\nWant: \n\n", expectedNotifications) + t.Log("\n\nGot : \n\n", gotNotifications) + t.Errorf("Unexpected updates:\n%s", diff) + } + } + + t.Run("Query all under Ethernet68, add new test field, assert failure (not implemented).", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68]", "..."}} + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1000000000039", "test_field", "test_value") + } + + // Querying for a new field is not yet implemented. + assertFailure := false + + doSubscribeTest(t, queryPaths, []client.Notification{}, updates, assertFailure) + }) + + t.Run("Query all under Ethernet68/1 (vendor valias), add new test field, assert failure (not implemented).", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68/1]", "..."}} + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1000000000039", "test_field", "test_value") + } + + // Querying for a new field is not yet implemented. + assertFailure := false + + doSubscribeTest(t, queryPaths, []client.Notification{}, updates, assertFailure) + }) + + t.Run("Stream query for Interfaces/Port[name=Ethernet68]/BaseCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS], updating SAI_PORT_STAT_PFC_7_RX_PKTS from 2 to 3, assert failure (not implemented).", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68]", "BaseCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS]"}} + + expectedNotifications := []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 0), Val: "2"}, + client.Sync{}, + client.Update{Path: []string{"COUNTERS", "Ethernet68"}, TS: time.Unix(0, 0), Val: "3"}, + } + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1000000000039", "SAI_PORT_STAT_PFC_7_RX_PKTS", "3") + } + + // Querying for a specific field under BaseCounter is not yet implemented. + assertFailure := false + + doSubscribeTest(t, queryPaths, expectedNotifications, updates, assertFailure) + }) + + t.Run("Stream query for Interfaces/Port[name=Ethernet68]/Queue[name=Queue3]/Pfcwd, updating PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED from 0 to 1.", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68]", "Queue[name=Queue3]", "Pfcwd"}} + + initialValue := make(map[string]string) + initialValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "0" + initialValue["PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED"] = "0" + initialValue["PFC_WD_STATUS"] = "operational" + initialValue["SAI_PORT_STAT_PFC_3_RX_PKTS"] = "0" + + updatedValue := make(map[string]string) + updatedValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "1" + + expectedNotifications := []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: initialValue}, + client.Sync{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: updatedValue}, + } + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1500000000091e", "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED", "1") + } + + doSubscribeTest(t, queryPaths, expectedNotifications, updates, true) + }) + + t.Run("Stream query for Interfaces/Port[name=Ethernet68/1]/Queue[name=Queue3]/Pfcwd (use vendor alias), updating PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED from 0 to 1.", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68/1]", "Queue[name=Queue3]", "Pfcwd"}} + + initialValue := make(map[string]string) + initialValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "0" + initialValue["PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED"] = "0" + initialValue["PFC_WD_STATUS"] = "operational" + initialValue["SAI_PORT_STAT_PFC_3_RX_PKTS"] = "0" + + updatedValue := make(map[string]string) + updatedValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "1" + + expectedNotifications := []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68/1", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: initialValue}, + client.Sync{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68/1", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: updatedValue}, + } + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1500000000091e", "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED", "1") + } + + doSubscribeTest(t, queryPaths, expectedNotifications, updates, true) + }) + + t.Run("Stream query for Interfaces/Port[name=*]/..., updating with new test field (not implemented).", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=*]", "..."}} + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1000000000039", "test_field", "test_value") + } + + // Querying for a new field is not yet implemented. + assertFailure := false + + doSubscribeTest(t, queryPaths, []client.Notification{}, updates, assertFailure) + }) + + t.Run("Stream query for Interfaces/Port[name=*]/PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS], updating SAI_PORT_STAT_PFC_7_RX_PKTS from 2 to 4 (not implemented).", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=*]", "PfcCounter[field=SAI_PORT_STAT_PFC_7_RX_PKTS]"}} + + initialValue := make(map[string]string) + initialValue["SAI_PORT_STAT_PFC_7_RX_PKTS"] = "2" + + updatedValue := make(map[string]string) + updatedValue["SAI_PORT_STAT_PFC_7_RX_PKTS"] = "4" + + expectedNotifications := []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "PfcCounter", "SAI_PORT_STAT_PFC_7_RX_PKTS"}, TS: time.Unix(0, 0), Val: initialValue}, + client.Sync{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "PfcCounter", "SAI_PORT_STAT_PFC_7_RX_PKTS"}, TS: time.Unix(0, 0), Val: updatedValue}, + } + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1000000000039", "SAI_PORT_STAT_PFC_7_RX_PKTS", "4") + } + + // Querying for a specific field under PfcCounter is not yet implemented. + assertFailure := false + + doSubscribeTest(t, queryPaths, expectedNotifications, updates, assertFailure) + }) + + t.Run("Stream query for Interfaces/Port[name=*]/Queue[name=*]/Pfcwd[field=PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED], updating PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED from 0 to 1.", func(t *testing.T) { + queryPaths := []client.Path{{"Interfaces", "Port[name=Ethernet68]", "Queue[name=Queue3]", "Pfcwd[field=PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED]"}} + + initialValue := make(map[string]string) + initialValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "0" + + updatedValue := make(map[string]string) + updatedValue["PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED"] = "1" + + expectedNotifications := []client.Notification{ + client.Connected{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: initialValue}, + client.Sync{}, + client.Update{Path: []string{"Interfaces", "Port", "Ethernet68", "Queue", "Queue3", "Pfcwd"}, TS: time.Unix(0, 0), Val: updatedValue}, + } + + updates := func() { + countersDB.HSet("COUNTERS:oid:0x1500000000091e", "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED", "1") + } + + doSubscribeTest(t, queryPaths, expectedNotifications, updates, true) + }) +} + +func init() { + // Inform gNMI server to use redis tcp localhost connection + vdc.UseRedisLocalTcpPort = true +} diff --git a/virtual_database_client/handler_func.go b/virtual_database_client/handler_func.go new file mode 100644 index 000000000..9b1ced4e8 --- /dev/null +++ b/virtual_database_client/handler_func.go @@ -0,0 +1,70 @@ +package client + +import ( + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +type handlerFunc func(*gnmipb.Path, *map[*gnmipb.Path][]tablePath) error + +type pathHdlrFunc struct { + path []string + handler handlerFunc +} + +var ( + pathTrie *PathTrie + + // path2HdlrFuncTbl is used to populate trie tree which is used to + // map gNMI path to real database paths. + path2HdlrFuncTbl = []pathHdlrFunc{ + { + // new virtual path for PFC WD stats + path: []string{"SONIC_DB", "Interfaces", "Port", "Queue", "Pfcwd"}, + handler: handlerFunc(v2rPortQueuePfcwdStats), + }, { + // new virtual path for Queue counters + path: []string{"SONIC_DB", "Interfaces", "Port", "Queue", "QueueCounter"}, + handler: handlerFunc(v2rPortQueueCounterStats), + }, { + // new virtual path for Port PFC counters + path: []string{"SONIC_DB", "Interfaces", "Port", "PfcCounter"}, + handler: handlerFunc(v2rPortPfcCounterStats), + }, { + // new virtual path for Port Base Counters + path: []string{"SONIC_DB", "Interfaces", "Port", "BaseCounter"}, + handler: handlerFunc(v2rPortBaseCounterStats), + }, + } +) + +func (t *PathTrie) TriePopulate() { + for _, pf := range path2HdlrFuncTbl { + n := t.Add(pf.path, pf.handler) + if n.meta.(handlerFunc) == nil { + log.V(1).Infof("Failed to add trie node for path %v with handler func %v", pf.path, pf.handler) + } else { + log.V(2).Infof("Add trie node for path %v with handler func %v", pf.path, pf.handler) + } + } +} + +func searchPathTrie(keys []string, path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + var nodes = []*PathNode{} + root := pathTrie.root + findPathNode(root, keys, &nodes) + + for _, node := range nodes { + handler := node.meta.(handlerFunc) + err := handler(path, pathG2S) + if err != nil { + return err + } + } + return nil +} + +func init() { + pathTrie = NewPathTrie() + pathTrie.TriePopulate() +} diff --git a/virtual_database_client/handler_pfc_counter.go b/virtual_database_client/handler_pfc_counter.go new file mode 100644 index 000000000..3f3aeb6be --- /dev/null +++ b/virtual_database_client/handler_pfc_counter.go @@ -0,0 +1,113 @@ +package client + +import ( + "fmt" + "strings" + + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +// Return template path +func GetTmpl_PortPfcCounterStats(path *gnmipb.Path) { + path.Elem = []*gnmipb.PathElem{} + + var name string + name = "Interfaces" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) + + name = "Port" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "PfcCounter" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) +} + +// gNMI paths are like +// [Interfaces Port[name= PfcCounter] +func v2rPortPfcCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + var tmpl = gnmipb.Path{} + GetTmpl_PortPfcCounterStats(&tmpl) + + parentConfig := map[int]string{1: "Port"} + + leaf := leafConfig{ + idx: 2, + name: "PfcCounter", + } + + target_fields := []string{} + updatePath(path, &tmpl, parentConfig, leaf, &target_fields) + + // Populate tabelPaths + err := pop_PortPfcCounterStats(&tmpl, pathG2S, target_fields) + if err != nil { + return err + } + return nil +} + +// Populate redis key and fields +func pop_PortPfcCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath, target_fields []string) error { + dbName := "COUNTERS_DB" + separator, _ := GetTableKeySeparator(dbName) + + elems := path.GetElem() + + // Populate port level + var idx_port = 1 + portName := elems[idx_port].GetKey()["name"] + if portName == "*" { + // Wildcard port name + for port, _ := range countersPortNameMap { + // Alias translation + var oport string + if alias, ok := name2aliasMap[port]; ok { + oport = alias + } else { + log.V(2).Infof("%v does not have a vendor alias", port) + oport = port + } + // Create a gNMI path for each port + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_port].Key["name"] = oport + err := pop_PortPfcCounterStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + var alias, _name string + alias = portName + _name = alias + if val, ok := alias2nameMap[alias]; ok { + _name = val + } + + oid_port, ok := countersPortNameMap[_name] + if !ok { + return fmt.Errorf("%v not a valid sonic interface. Vendor alias is %v", _name, alias) + } + + // TODO: Subscribe to only particular fields + if len(target_fields) > 0 { + return fmt.Errorf("Subscribe to field of Path: %v not supported", path) + } + + tblPath_port := tablePath{ + dbName: dbName, + keyName: strings.Join([]string{"COUNTERS", oid_port}, separator), + delimitor: separator, + patterns: []string{"SAI_PORT_STAT_PFC_._RX_PKTS$", "SAI_PORT_STAT_PFC_._TX_PKTS$"}, + } + + (*pathG2S)[path] = []tablePath{tblPath_port} + return nil +} diff --git a/virtual_database_client/handler_pfcwd.go b/virtual_database_client/handler_pfcwd.go new file mode 100644 index 000000000..ea772c2c4 --- /dev/null +++ b/virtual_database_client/handler_pfcwd.go @@ -0,0 +1,253 @@ +package client + +import ( + "fmt" + "strings" + + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +func deepcopy(path, copyPath *gnmipb.Path) { + copyPath.Elem = []*gnmipb.PathElem{} + + for _, elem := range path.GetElem() { + var copyElem = gnmipb.PathElem{} + copyElem.Name = elem.GetName() + copyElem.Key = map[string]string{} + for k, v := range elem.Key { + copyElem.Key[k] = v + } + copyPath.Elem = append(copyPath.Elem, ©Elem) + } +} + +// Contains tell whether array contains X +func contains(a []string, x string) bool { + for _, n := range a { + if x == n { + return true + } + } + return false +} + +type leafConfig struct { + idx int + name string +} + +func updatePath(path, tmpl *gnmipb.Path, parentConfig map[int]string, leaf leafConfig, target_fields *[]string) { + // Update parent node + for idx, name := range parentConfig { + for _, elem := range path.GetElem() { + if elem.GetName() == name { + (*tmpl).Elem[idx].Key["name"] = elem.Key["name"] + break + } + } + } + + // Update fields: if subscribe to particular fields + for _, elem := range path.GetElem() { + if elem.GetName() == leaf.name { + if fieldStr, ok := elem.GetKey()["field"]; ok { + (*target_fields) = strings.Split(fieldStr, ",") + } + } + } +} + +// Return template path +func GetTmpl_PortQueuePfcwdStats(path *gnmipb.Path) { + path.Elem = []*gnmipb.PathElem{} + + var name string + name = "Interfaces" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) + + name = "Port" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "Queue" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "Pfcwd" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) +} + +// Translate gNMI path into a list of unique root-to-leaf vpaths +// then map each unique vpath to a list of real DB tablePaths +// gNMI paths are like +// [Inerfaces Port[name=] Queue[name=] Pfcwd] +func v2rPortQueuePfcwdStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + var tmpl = gnmipb.Path{} + GetTmpl_PortQueuePfcwdStats(&tmpl) + + parentConfig := map[int]string{1: "Port", 2: "Queue"} + + leaf := leafConfig{ + idx: 3, + name: "Pfcwd", + } + + targetFields := []string{} + updatePath(path, &tmpl, parentConfig, leaf, &targetFields) + + // Populate tablePaths + err := pop_PortQueuePfcwdStats(&tmpl, pathG2S, targetFields) + if err != nil { + return err + } + return nil +} + +// Populate +func pop_PortQueuePfcwdStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath, target_fields []string) error { + dbName := "COUNTERS_DB" + separator, _ := GetTableKeySeparator(dbName) + + elems := path.GetElem() + log.V(5).Infof("path: %v\n\n", path) + + // Populate port level + var idx_port = 1 + portName := elems[idx_port].GetKey()["name"] + if portName == "*" { + // wildcard port name + for port, _ := range countersPortNameMap { + // Alias translation + var oport string + if alias, ok := name2aliasMap[port]; ok { + oport = alias + } else { + log.V(2).Infof("%v does not have a vendor alias", port) + oport = port + } + // Create a gNMI path for each port + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_port].Key["name"] = oport + err := pop_PortQueuePfcwdStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + var alias, _name string + alias = portName + _name = alias + if val, ok := alias2nameMap[alias]; ok { + _name = val + } + + oid_port, ok := countersPortNameMap[_name] + if !ok { + return fmt.Errorf("%v not a valid sonic interface. Vendor alias is %v", _name, alias) + } + + // Populate queue level + var idx_que = 2 + queName := elems[idx_que].GetKey()["name"] + if queName == "*" { + // wildcard queue name + for pfcque, _ := range countersPfcwdNameMap[_name] { + // pfcque is in format of "Interface:12" + // Alias translation + stringSlice := strings.Split(pfcque, separator) + //new_queName := strings.Join([]string{"Queue", stringSlice[1]}, separator) + new_queName := "Queue" + stringSlice[1] + // Create a gNMI path for each PFC WD enabled queue + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_que].Key["name"] = new_queName + err := pop_PortQueuePfcwdStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + if !strings.HasPrefix(queName, "Queue") { + return fmt.Errorf("%v not a vaild queue name. Use format 'Queue'", queName) + } + queNum := strings.TrimPrefix(queName, "Queue") + pfcque := strings.Join([]string{_name, queNum}, separator) + if _, ok := countersPfcwdNameMap[_name]; ok { + if oid_que, ok := countersPfcwdNameMap[_name][pfcque]; ok { + // PFC WD is enabled for port:queue + out_tblPaths := []tablePath{} + // Fields under the queue oid + full_fields := []string{ + "PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED", + "PFC_WD_QUEUE_STATS_TX_DROPPED_PACKETS", + "PFC_WD_QUEUE_STATS_RX_DROPPED_PACKETS", + "PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED", + "PFC_wD_QUEUE_STATS_TX_PACKETS", + "PFC_WD_QUEUE_STATS_RX_PACKETS", + "PFC_WD_STATUS", + } + + if len(target_fields) > 0 { + // Subscirbe to only particular fields + key_target_fields := []string{} + for _, targetField := range target_fields { + if contains(full_fields, targetField) { + key_target_fields = append(key_target_fields, targetField) + } + } + full_fields = key_target_fields + } + + if len(full_fields) > 0 { + tblPath_que := tablePath{ + dbName: dbName, + keyName: strings.Join([]string{"COUNTERS", oid_que}, separator), + delimitor: separator, + fields: full_fields, + } + out_tblPaths = append(out_tblPaths, tblPath_que) + } + + // Fields under the port oid + full_fields = []string{fmt.Sprintf("SAI_PORT_STAT_PFC_%v_RX_PKTS", queNum)} + + if len(target_fields) > 0 { + // Subscirbe to only particular fields + key_target_fields := []string{} + for _, targetField := range target_fields { + if contains(full_fields, targetField) { + key_target_fields = append(key_target_fields, targetField) + } + } + full_fields = key_target_fields + } + + if len(full_fields) > 0 { + tblPath_port := tablePath{ + dbName: dbName, + keyName: strings.Join([]string{"COUNTERS", oid_port}, separator), + delimitor: separator, + fields: full_fields, + } + out_tblPaths = append(out_tblPaths, tblPath_port) + } + + if len(out_tblPaths) > 0 { + (*pathG2S)[path] = out_tblPaths + } + } + } + return nil +} diff --git a/virtual_database_client/handler_port_counter.go b/virtual_database_client/handler_port_counter.go new file mode 100644 index 000000000..0ddedc6de --- /dev/null +++ b/virtual_database_client/handler_port_counter.go @@ -0,0 +1,113 @@ +package client + +import ( + "fmt" + "strings" + + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +// Return template path +func GetTmpl_PortBaseCounterStats(path *gnmipb.Path) { + path.Elem = []*gnmipb.PathElem{} + + var name string + name = "Interfaces" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) + + name = "Port" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "BaseCounter" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) +} + +// gNMI paths are like +// [Interfaces Port[name= PfcCounter] +func v2rPortBaseCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + var tmpl = gnmipb.Path{} + GetTmpl_PortBaseCounterStats(&tmpl) + + parentConfig := map[int]string{1: "Port"} + + leaf := leafConfig{ + idx: 2, + name: "BaseCounter", + } + + target_fields := []string{} + updatePath(path, &tmpl, parentConfig, leaf, &target_fields) + + // Populate tabelPaths + err := pop_PortBaseCounterStats(&tmpl, pathG2S, target_fields) + if err != nil { + return err + } + return nil +} + +// Populate redis key and fields +func pop_PortBaseCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath, target_fields []string) error { + dbName := "COUNTERS_DB" + separator, _ := GetTableKeySeparator(dbName) + + elems := path.GetElem() + + // Populate port level + var idx_port = 1 + portName := elems[idx_port].GetKey()["name"] + if portName == "*" { + // Wildcard port name + for port, _ := range countersPortNameMap { + // Alias translation + var oport string + if alias, ok := name2aliasMap[port]; ok { + oport = alias + } else { + log.V(2).Infof("%v does not have a vendor alias", port) + oport = port + } + // Create a gNMI path for each port + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_port].Key["name"] = oport + err := pop_PortBaseCounterStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + var alias, _name string + alias = portName + _name = alias + if val, ok := alias2nameMap[alias]; ok { + _name = val + } + + oid_port, ok := countersPortNameMap[_name] + if !ok { + log.V(1).Infof("RANDY: 3") + return fmt.Errorf("%v not a valid sonic interface. Vendor alias is %v", _name, alias) + } + + // TODO: Subscribe to only particular fields + if len(target_fields) > 0 { + return fmt.Errorf("Subscribe to field of Path: %v not supported", path) + } + tblPath_port := tablePath{ + dbName: dbName, + keyName: strings.Join([]string{"COUNTERS", oid_port}, separator), + delimitor: separator, + patterns: []string{"SAI_PORT_STAT_I.*", "SAI_PORT_STAT_ETHER.*"}, + } + + (*pathG2S)[path] = []tablePath{tblPath_port} + return nil +} diff --git a/virtual_database_client/handler_queue_counter.go b/virtual_database_client/handler_queue_counter.go new file mode 100644 index 000000000..dd14ae9df --- /dev/null +++ b/virtual_database_client/handler_queue_counter.go @@ -0,0 +1,155 @@ +package client + +import ( + "fmt" + "strings" + + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +// Return template path +func GetTmpl_PortQueueCounterStats(path *gnmipb.Path) { + path.Elem = []*gnmipb.PathElem{} + + var name string + name = "Interfaces" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) + + name = "Port" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "Queue" + path.Elem = append(path.Elem, &gnmipb.PathElem{ + Name: name, + Key: map[string]string{"name": "*"}, + }) + + name = "QueueCounter" + path.Elem = append(path.Elem, &gnmipb.PathElem{Name: name}) +} + +// gNMI paths are like +// [Interfaces Port[name= Queue[name=] QueueCounter] +func v2rPortQueueCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + var tmpl = gnmipb.Path{} + GetTmpl_PortQueueCounterStats(&tmpl) + + parentConfig := map[int]string{1: "Port", 2: "Queue"} + + leaf := leafConfig{ + idx: 3, + name: "QueueCounter", + } + + target_fields := []string{} + updatePath(path, &tmpl, parentConfig, leaf, &target_fields) + + // Populate tablePaths + err := pop_PortQueueCounterStats(&tmpl, pathG2S, target_fields) + if err != nil { + return err + } + return nil +} + +// Populate redis key and fields +func pop_PortQueueCounterStats(path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath, target_fields []string) error { + dbName := "COUNTERS_DB" + separator, _ := GetTableKeySeparator(dbName) + + elems := path.GetElem() + + // Populate port level + var idx_port = 1 + portName := elems[idx_port].GetKey()["name"] + if portName == "*" { + // wildcard port name + for port, _ := range countersPortNameMap { + // Alias translation + var oport string + if alias, ok := name2aliasMap[port]; ok { + oport = alias + } else { + log.V(2).Infof("%v does not have a vendor alias", port) + oport = port + } + // Create a gNMI path for each port + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_port].Key["name"] = oport + err := pop_PortQueueCounterStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + var alias, _name string + alias = portName + _name = alias + if val, ok := alias2nameMap[alias]; ok { + _name = val + } + + _, ok := countersQueueNameMap[_name] + if !ok { + log.V(1).Infof("RANDY: 4") + return fmt.Errorf("%v not a valid sonic interface. Vendor alias is %v", _name, alias) + } + + // Populate queue level + var idx_que = 2 + queName := elems[idx_que].GetKey()["name"] + if queName == "*" { + // wildcard queue name + for que, _ := range countersQueueNameMap[_name] { + // que is in format of "Ethernet68:12" + stringSlice := strings.Split(que, separator) + new_queName := "Queue" + stringSlice[1] + // Create a gNMI path for each queue + var copyPath = gnmipb.Path{} + deepcopy(path, ©Path) + copyPath.Elem[idx_que].Key["name"] = new_queName + err := pop_PortQueueCounterStats(©Path, pathG2S, target_fields) + if err != nil { + return err + } + } + return nil + } + + // Alias translation + if !strings.HasPrefix(queName, "Queue") { + return fmt.Errorf("%v not a vaild queue name in request. Use format 'Queue'", queName) + } + queNum := strings.TrimPrefix(queName, "Queue") + que := strings.Join([]string{_name, queNum}, separator) + oid_que, ok := countersQueueNameMap[_name][que] + if !ok { + return fmt.Errorf("%v not a valid queue name in redis db.", que) + } + + // TODO: subscribe to only particular fields + if len(target_fields) > 0 { + return fmt.Errorf("Subscribe to particular field of path %v not supported", path) + } + + tblPath_que := tablePath{ + dbName: dbName, + keyName: strings.Join([]string{"COUNTERS", oid_que}, separator), + delimitor: separator, + fields: []string{ + "SAI_QUEUE_STAT_PACKETS", + "SAI_QUEUE_STAT_BYTES", + "SAI_QUEUE_STAT_DROPPED_PACKETS", + "SAI_QUEUE_STAT_DROPPED_BYTES"}, + } + (*pathG2S)[path] = []tablePath{tblPath_que} + return nil +} diff --git a/virtual_database_client/map_init.go b/virtual_database_client/map_init.go new file mode 100644 index 000000000..cb98facfa --- /dev/null +++ b/virtual_database_client/map_init.go @@ -0,0 +1,218 @@ +package client + +import ( + "fmt" + "strings" + + log "github.com/golang/glog" +) + +var ( + // Port name to oid map in COUNTERS table of COUNTERS_DB + countersPortNameMap = make(map[string]string) + + // Queue name to oid map in COUNTERS table of COUNTERS_DB + countersQueueNameMap = make(map[string]map[string]string) + + // Alias translation: from vendor port name to sonic interface name + alias2nameMap = make(map[string]string) + // Alias translation: from sonic interface name to vendor port name + name2aliasMap = make(map[string]string) + + // SONiC interface name to their PFC-WD enabled queues, then to oid map + countersPfcwdNameMap = make(map[string]map[string]string) +) + +func initCountersQueueNameMap() error { + dbName := "COUNTERS_DB" + separator, _ := GetTableKeySeparator(dbName) + + if len(countersQueueNameMap) == 0 { + queueMap, err := getCountersMap("COUNTERS_QUEUE_NAME_MAP") + if err != nil { + return err + } + for k, v := range queueMap { + stringSlice := strings.Split(k, separator) + port := stringSlice[0] + if _, ok := countersQueueNameMap[port]; !ok { + countersQueueNameMap[port] = make(map[string]string) + } + countersQueueNameMap[port][k] = v + } + } + return nil +} + +func initCountersPortNameMap() error { + var err error + if len(countersPortNameMap) == 0 { + countersPortNameMap, err = getCountersMap("COUNTERS_PORT_NAME_MAP") + if err != nil { + return err + } + } + return nil +} + +func initAliasMap() error { + var err error + if len(alias2nameMap) == 0 { + alias2nameMap, name2aliasMap, err = getAliasMap() + if err != nil { + return err + } + } + return nil +} + +func initCountersPfcwdNameMap() error { + var err error + if len(countersPfcwdNameMap) == 0 { + countersPfcwdNameMap, err = getPfcwdMap() + if err != nil { + return err + } + } + return nil +} + +// Get the mapping between sonic interface name and oids of their PFC-WD enabled queues in COUNTERS_DB +func getPfcwdMap() (map[string]map[string]string, error) { + var pfcwdName_map = make(map[string]map[string]string) + + dbName := "CONFIG_DB" + separator, _ := GetTableKeySeparator(dbName) + redisDb, _ := Target2RedisDb[dbName] + _, err := redisDb.Ping().Result() + if err != nil { + log.V(1).Infof("Can not connect to %v, err: %v", dbName, err) + return nil, err + } + + keyName := fmt.Sprintf("PFC_WD_TABLE%v*", separator) + resp, err := redisDb.Keys(keyName).Result() + if err != nil { + log.V(1).Infof("redis get keys failed for %v, key = %v, err: %v", dbName, keyName, err) + return nil, err + } + + if len(resp) == 0 { + // PFC WD service not enabled on device + log.V(1).Infof("PFC WD not enabled on device") + return nil, nil + } + + for _, key := range resp { + name := key[13:] + pfcwdName_map[name] = make(map[string]string) + } + + // Get Queue indexes that are enabled with PFC-WD + keyName = "PORT_QOS_MAP*" + resp, err = redisDb.Keys(keyName).Result() + if err != nil { + log.V(1).Infof("redis get keys failed for %v, key = %v, err: %v", dbName, keyName, err) + return nil, err + } + if len(resp) == 0 { + log.V(1).Infof("PFC WD not enabled on device") + return nil, nil + } + qos_key := resp[0] + + fieldName := "pfc_enable" + priorities, err := redisDb.HGet(qos_key, fieldName).Result() + if err != nil { + log.V(1).Infof("redis get field failed for %v, key = %v, field = %v, err: %v", dbName, qos_key, fieldName, err) + return nil, err + } + + keyName = fmt.Sprintf("MAP_PFC_PRIORITY_TO_QUEUE%vAZURE", separator) + pfc_queue_map, err := redisDb.HGetAll(keyName).Result() + if err != nil { + log.V(1).Infof("redis get fields failed for %v, key = %v, err: %v", dbName, keyName, err) + return nil, err + } + + var indices []string + for _, p := range strings.Split(priorities, ",") { + _, ok := pfc_queue_map[p] + if !ok { + log.V(1).Infof("Missing mapping between PFC priority %v to queue", p) + } else { + indices = append(indices, pfc_queue_map[p]) + } + } + + if len(countersQueueNameMap) == 0 { + log.V(1).Infof("COUNTERS_QUEUE_NAME_MAP is empty") + return nil, nil + } + + var queue_key string + queue_separator, _ := GetTableKeySeparator("COUNTERS_DB") + for port, _ := range pfcwdName_map { + for _, indice := range indices { + queue_key = port + queue_separator + indice + oid, ok := countersQueueNameMap[port][queue_key] + if !ok { + return nil, fmt.Errorf("key %v not exists in COUNTERS_QUEUE_NAME_MAP", queue_key) + } + pfcwdName_map[port][queue_key] = oid + } + } + + log.V(6).Infof("countersPfcwdNameMap: %v", pfcwdName_map) + return pfcwdName_map, nil +} + +// Get the mapping between sonic interface name and vendor alias +func getAliasMap() (map[string]string, map[string]string, error) { + var alias2name_map = make(map[string]string) + var name2alias_map = make(map[string]string) + + dbName := "CONFIG_DB" + separator, _ := GetTableKeySeparator(dbName) + redisDb, _ := Target2RedisDb[dbName] + _, err := redisDb.Ping().Result() + if err != nil { + log.V(1).Infof("Can not connect to %v, err: %v", dbName, err) + return nil, nil, err + } + + keyName := fmt.Sprintf("PORT%v*", separator) + resp, err := redisDb.Keys(keyName).Result() + if err != nil { + log.V(1).Infof("redis get keys failed for %v, key = %v, err: %v", dbName, keyName, err) + return nil, nil, err + } + for _, key := range resp { + alias, err := redisDb.HGet(key, "alias").Result() + if err != nil { + log.V(1).Infof("redis get field alias failed for %v, key = %v, err: %v", dbName, key, err) + // clear aliasMap + alias2name_map = make(map[string]string) + name2alias_map = make(map[string]string) + return nil, nil, err + } + alias2name_map[alias] = key[5:] + name2alias_map[key[5:]] = alias + } + log.V(6).Infof("alias2nameMap: %v", alias2name_map) + log.V(6).Infof("name2aliasMap: %v", name2alias_map) + return alias2name_map, name2alias_map, nil +} + +// Get the mapping between objects in counters DB, Ex. port name to oid in "COUNTERS_PORT_NAME_MAP" table. +// Aussuming static port name to oid map in COUNTERS table +func getCountersMap(tableName string) (map[string]string, error) { + redisDb, _ := Target2RedisDb["COUNTERS_DB"] + fv, err := redisDb.HGetAll(tableName).Result() + if err != nil { + log.V(2).Infof("redis HGetAll failed for COUNTERS_DB, tableName: %s", tableName) + return nil, err + } + log.V(6).Infof("tableName: %s, map %v", tableName, fv) + return fv, nil +} diff --git a/virtual_database_client/path.go b/virtual_database_client/path.go new file mode 100644 index 000000000..3ab23a9b8 --- /dev/null +++ b/virtual_database_client/path.go @@ -0,0 +1,316 @@ +package client + +import ( + "fmt" + "strconv" + + "encoding/json" + "net" + "regexp" + "time" + + spb "github.com/Azure/sonic-telemetry/proto" + "github.com/go-redis/redis" + log "github.com/golang/glog" + gnmipb "github.com/openconfig/gnmi/proto/gnmi" +) + +// gnmiFullPath builds the full path from the prefix and path. +func gnmiFullPath(prefix, path *gnmipb.Path) *gnmipb.Path { + + fullPath := &gnmipb.Path{Origin: path.Origin} + if path.GetElement() != nil { + fullPath.Element = append(prefix.GetElement(), path.GetElement()...) + } + if path.GetElem() != nil { + fullPath.Elem = append(prefix.GetElem(), path.GetElem()...) + } + return fullPath +} + +func populateAlltablePaths(prefix *gnmipb.Path, paths []*gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + for _, path := range paths { + err := populateNewtablePath(prefix, path, pathG2S) + if err != nil { + return err + } + } + return nil +} + +// First translate gNMI paths to a list of unique +// root-to-leaf virtual paths in the vpath tree. +// Then map each vpath to a list of redis DB path. +func populateNewtablePath(prefix, path *gnmipb.Path, pathG2S *map[*gnmipb.Path][]tablePath) error { + target := prefix.GetTarget() + stringSlice := []string{target} + elems := path.GetElem() + for _, elem := range elems { + stringSlice = append(stringSlice, elem.GetName()) + } + + err := searchPathTrie(stringSlice, path, pathG2S) + if err != nil { + return err + } + + return nil +} + +func msi2TypedValue(msi map[string]interface{}) (*gnmipb.TypedValue, error) { + jv, err := emitJSON(&msi) + if err != nil { + log.V(2).Infof("emitJSON err %s for %v", err, msi) + return nil, fmt.Errorf("emitJSON err %s for %v", err, msi) + } + return &gnmipb.TypedValue{ + Value: &gnmipb.TypedValue_JsonIetfVal{ + JsonIetfVal: jv, + }}, nil +} + +// emitJSON marshalls map[string]interface{} to JSON byte stream. +func emitJSON(v *map[string]interface{}) ([]byte, error) { + //j, err := json.MarshalIndent(*v, "", indentString) + j, err := json.Marshal(*v) + if err != nil { + return nil, fmt.Errorf("JSON marshalling error: %v", err) + } + + return j, nil +} + +// Render the redis DB data to map[string]interface{} +// which may be marshaled to JSON format +// tablePath includes [dbName, keyName, fields] +func tableData2TypedValue(tblPaths []tablePath) (*gnmipb.TypedValue, error) { + msi := make(map[string]interface{}) + + for _, tblPath := range tblPaths { + err := tableData2Msi(&tblPath, &msi) + if err != nil { + return nil, err + } + } + return msi2TypedValue(msi) +} + +func tableData2Msi(tblPath *tablePath, msi *map[string]interface{}) error { + dbName := tblPath.dbName + keyName := tblPath.keyName + + redisDb := Target2RedisDb[dbName] + val, err := redisDb.HGetAll(keyName).Result() + if err != nil { + log.V(3).Infof("redis HGetAll failed for %v, dbName = %v, keyName=%v", tblPath, dbName, keyName) + return err + } + + patterns := tblPath.patterns + for field, value := range val { + for _, pattern := range patterns { + r := regexp.MustCompile(pattern) + if r.MatchString(field) { + (*msi)[field] = value + break + } + } + } + + fields := tblPath.fields + for _, field := range fields { + if value, ok := val[field]; !ok { + log.V(1).Infof("Missing field: %v", field) + } else { + (*msi)[field] = value + } + } + return nil +} + +func enqueFatalMsg(c *DbClient, msg string) { + c.q.Put(Value{ + &spb.Value{ + Timestamp: time.Now().UnixNano(), + Fatal: msg, + }, + }) +} + +type redisSubData struct { + tblPath tablePath + pubsub *redis.PubSub +} + +func dbSingleTableKeySubscribe(rsd redisSubData, c *DbClient, msiInit *map[string]interface{}, msiOut *map[string]interface{}) { + tblPath := rsd.tblPath + pubsub := rsd.pubsub + msi := make(map[string]interface{}) + // Initialize msi + for k, v := range *msiInit { + msi[k] = v + } + + for { + select { + default: + msgi, err := pubsub.ReceiveTimeout(time.Millisecond * 500) + if err != nil { + neterr, ok := err.(net.Error) + if ok { + if neterr.Timeout() == true { + continue + } + } + log.V(2).Infof("pubsub.ReceiveTimeout err %v", err) + continue + } + newMsi := make(map[string]interface{}) + subscr := msgi.(*redis.Message) + + if subscr.Payload == "hset" || subscr.Payload == "hsetnx" || subscr.Payload == "hmset" { + err = tableData2Msi(&tblPath, &newMsi) + if err != nil { + enqueFatalMsg(c, err.Error()) + return + } + + c.mu.Lock() + for k, v := range newMsi { + _, ok := msi[k] + if !ok { + (*msiOut)[k] = v + msi[k] = v + } else { + if v != msi[k] { + (*msiOut)[k] = v + msi[k] = v + } + } + } + c.mu.Unlock() + } + case <-c.channel: + log.V(2).Infof("Stopping dbSingleTableKeySubscribe routine for %+v", tblPath) + return + } + } +} + +// Subscribe to a specific gNMI path +func dbPathSubscribe(gnmiPath *gnmipb.Path, tblPaths []tablePath, c *DbClient) { + //tblPaths := c.pathG2S[gnmiPath] + msi := make(map[string]interface{}) + + for _, tblPath := range tblPaths { + err := tableData2Msi(&tblPath, &msi) + if err != nil { + enqueFatalMsg(c, err.Error()) + return + } + } + val, err := msi2TypedValue(msi) + if err != nil { + enqueFatalMsg(c, err.Error()) + return + } + + var spbv *spb.Value + spbv = &spb.Value{ + Prefix: c.prefix, + Path: gnmiPath, + Timestamp: time.Now().UnixNano(), + Val: val, + } + if err = c.q.Put(Value{spbv}); err != nil { + log.V(1).Infof("Queue error: %v", err) + return + } + + // First sync for this key is done + c.synced.Done() + + msiInit := make(map[string]interface{}) + for k, v := range msi { + msiInit[k] = v + } + for k := range msi { + delete(msi, k) + } + + // Redis pubsub to monitor table keys + for _, tblPath := range tblPaths { + dbName := tblPath.dbName + keyName := tblPath.keyName + redisDb := Target2RedisDb[dbName] + + // Subscribe to keyspace notification + pattern := "__keyspace@" + strconv.Itoa(int(spb.Target_value[dbName])) + "__:" + pattern += keyName + pubsub := redisDb.PSubscribe(pattern) + defer pubsub.Close() + + msgi, err := pubsub.ReceiveTimeout(time.Second) + if err != nil { + log.V(1).Infof("psubscribe to %s failed for %v", pattern, tblPath) + enqueFatalMsg(c, fmt.Sprintf("psubscribe to %s failed for %v", pattern, tblPath)) + return + } + + subscr := msgi.(*redis.Subscription) + if subscr.Channel != pattern { + log.V(1).Infof("psubscribe to %s failed for %v", pattern, tblPath) + enqueFatalMsg(c, fmt.Sprintf("psubscribe to %s failed for %v", pattern, tblPath)) + return + } + log.V(2).Infof("Psubscribe succeeded for %v: %v", tblPath, subscr) + + rsd := redisSubData{ + tblPath: tblPath, + pubsub: pubsub, + } + go dbSingleTableKeySubscribe(rsd, c, &msiInit, &msi) + } + + for { + select { + default: + val = nil + err = nil + c.mu.Lock() + if len(msi) > 0 { + val, err = msi2TypedValue(msi) + for k := range msi { + delete(msi, k) + } + } + c.mu.Unlock() + + if err != nil { + enqueFatalMsg(c, err.Error()) + return + } + + if val != nil { + spbv = &spb.Value{ + Path: gnmiPath, + Timestamp: time.Now().UnixNano(), + Val: val, + } + + log.V(5).Infof("dbTableKeySubscribe enque: %v", spbv) + if err = c.q.Put(Value{spbv}); err != nil { + log.V(1).Infof("Queue error: %v", err) + return + } + } + + // check possible value change every 100 millisecond + // TODO: make all the instances of wait timer consistent + time.Sleep(time.Millisecond * 100) + case <-c.channel: + log.V(1).Infof("Stopping dbTableKeySubscribe routine for %v ", gnmiPath) + return + } + } +} diff --git a/virtual_database_client/trie.go b/virtual_database_client/trie.go new file mode 100644 index 000000000..694dcf311 --- /dev/null +++ b/virtual_database_client/trie.go @@ -0,0 +1,109 @@ +package client + +type PathNode struct { + val string + depth int + term bool + children map[string]*PathNode + parent *PathNode + meta interface{} +} + +type PathTrie struct { + root *PathNode + size int +} + +func NewPathTrie() *PathTrie { + return &PathTrie{ + root: &PathNode{children: make(map[string]*PathNode), depth: 0}, + size: 0, + } +} + +// Adds an entry to the Trie, including meta data. Meta data +// is stored as 'interface{}' and must be type cast by the caller +func (t *PathTrie) Add(keys []string, meta interface{}) *PathNode { + node := t.root + for _, key := range keys { + if _, ok := node.children[key]; !ok { + // A new PathNode + newNode := PathNode{ + val: key, + depth: node.depth + 1, + term: false, + children: make(map[string]*PathNode), + parent: node, + meta: meta, + } + node.children[key] = &newNode + } + node = node.children[key] + } + node.meta = meta + node.term = true + return node +} + +// Returns the parent of this node +func (n PathNode) Parent() *PathNode { + return n.parent +} + +func (n PathNode) Meta() interface{} { + return n.meta +} + +func (n PathNode) Terminating() bool { + return n.term +} + +func (n PathNode) Val() string { + return n.val +} + +func (n PathNode) Children() map[string]*PathNode { + return n.children +} + +func findPathNode(node *PathNode, keys []string, result *[]*PathNode) { + // gNMI path convention: + // '...' is multi-level wildcard, and '*' is single-level wildcard + + if len(keys) == 0 { + if node.Terminating() == true { + *result = append(*result, node) + } + return + } + + key := keys[0] + if node.Terminating() == true { + // Leaf node + if (len(keys) == 1) && (key == "...") { + *result = append(*result, node) + } + return + } + + children := node.Children() + if key == "..." { + if len(keys) > 1 { + if child, ok := children[keys[1]]; ok { + findPathNode(child, keys[2:], result) + } + } + for _, child := range children { + findPathNode(child, keys, result) + } + } else if key == "*" { + for _, child := range children { + findPathNode(child, keys[1:], result) + } + } else { + if child, ok := children[key]; ok { + findPathNode(child, keys[1:], result) + } + } + return +}