Skip to content

Commit

Permalink
Merge pull request #39 from colinmcintosh/cm/zk-fixes
Browse files Browse the repository at this point in the history
Bug fixes for zookeeper and locking behavior
  • Loading branch information
colinmcintosh authored Sep 1, 2021
2 parents 002a9b0 + 95fa73f commit 3e8feb8
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 6 deletions.
18 changes: 15 additions & 3 deletions gateway/connections/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (t *ConnectionState) doConnect() {
var ctx context.Context
ctx, t.clientCancel = context.WithCancel(context.Background())
t.config.Log.Info().Msgf("Target %s: Subscribing", t.name)
t.client = client.Reconnect(&client.BaseClient{}, t.disconnected, nil)
t.client = client.Reconnect(&client.BaseClient{}, t.disconnected, t.reset)
if err := t.client.Subscribe(ctx, query, gnmiclient.Type); err != nil {
t.config.Log.Info().Msgf("Target %s: Subscribe stopped: %v", t.name, err)
}
Expand Down Expand Up @@ -240,7 +240,11 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) {
}
if connectionSlotAcquired {
if !t.ConnectionLockAcquired {
t.ConnectionLockAcquired, _ = t.lock.Try()
var err error
t.ConnectionLockAcquired, err = t.lock.Try()
if err != nil {
t.config.Log.Error().Msgf("error while trying to acquire lock: %v", err)
}
}
if t.ConnectionLockAcquired {
t.config.Log.Info().Msgf("Target %s: Lock acquired", t.name)
Expand All @@ -251,6 +255,8 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) {
}
t.ConnectionLockAcquired = false
t.config.Log.Info().Msgf("Target %s: Lock released", t.name)
} else {
time.Sleep(1 * time.Second)
}
}
}
Expand All @@ -266,6 +272,11 @@ func (t *ConnectionState) disconnect() error {
return t.client.Close() // this will disconnect and reset the cache via the disconnect callback
}

// reset is the callback for gNMI client to signal that it will reconnect.
func (t *ConnectionState) reset() {
t.config.Log.Info().Msgf("Target %s: gNMI client will reconnect", t.name)
}

// Callback for gNMI client to signal that it has disconnected.
func (t *ConnectionState) disconnected() {
t.connected = false
Expand All @@ -287,7 +298,8 @@ func (t *ConnectionState) reconnect() error {
func (t *ConnectionState) unlock() error {
t.config.Log.Info().Msgf("Target %s: Unlocking", t.name)
t.clientCancel()
return t.client.Close()
return nil
//return t.client.Close()
}

// handleUpdate parses a protobuf message received from the targetCache. This implementation handles only
Expand Down
3 changes: 2 additions & 1 deletion gateway/connections/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func (c *ZookeeperConnectionManager) eventListener(zkEvents <-chan zk.Event) {
c.connectionsMutex.Lock()
for _, targetConfig := range c.connections {
if targetConfig.useLock {
_ = targetConfig.unlock()
err := targetConfig.unlock()
c.config.Log.Error().Msgf("error while unlocking target: %v", err)
}
}
c.connectionsMutex.Unlock()
Expand Down
10 changes: 8 additions & 2 deletions gateway/locking/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (l *ZookeeperNonBlockingLock) ID() string {
}

func parseSeq(path string) (int, error) {
parts := strings.Split(path, "-")
parts := strings.Split(path, ":")
return strconv.Atoi(parts[len(parts)-1])
}

Expand All @@ -127,7 +127,7 @@ func (l *ZookeeperNonBlockingLock) try() (bool, error) {
return true, zk.ErrDeadlock
}

prefix := fmt.Sprintf("%s/lock-", l.id)
prefix := fmt.Sprintf("%s/lock:", l.id)

// Attempt to add a sequence to the tree
path, err := l.conn.CreateProtectedEphemeralSequential(prefix, []byte(l.member), l.acl)
Expand All @@ -150,11 +150,17 @@ func (l *ZookeeperNonBlockingLock) try() (bool, error) {

seq, err := parseSeq(path)
if err != nil {
if delErr := l.conn.Delete(path, -1); delErr != nil {
return false, fmt.Errorf("unable to remove lock path during seq parse error: %v: %v", delErr, err)
}
return false, err
}

lowestSeq, _, err := l.lowestSeqChild(l.id)
if err != nil {
if delErr := l.conn.Delete(path, -1); delErr != nil {
return false, fmt.Errorf("unable to remove lock path during lowest seq error: %v: %v", delErr, err)
}
return false, err
}

Expand Down
3 changes: 3 additions & 0 deletions gateway/openconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func getTypeByPath(entry *yang.Entry, path []string) string {
child, exists := entry.Dir[path[0]]
if exists {
if len(path) == 1 {
if child.Type == nil {
return ""
}
return child.Type.Name
}
return getTypeByPath(entry.Dir[path[0]], path[1:])
Expand Down
33 changes: 33 additions & 0 deletions gateway/openconfig/types_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// +build integration

// Copyright 2020 Netflix Inc
// Author: Colin McIntosh ([email protected])
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package openconfig

import (
"github.com/stretchr/testify/require"
"testing"
)

func TestTypeLookup_GetTypeByPath(t *testing.T) {
lookup := new(TypeLookup)
err := lookup.LoadAllModules("../../oc-models/")
require.NoError(t, err)

require.Equal(t, "counter64", lookup.GetTypeByPath([]string{"interfaces", "interface", "state", "counters", "out-octets"}))
require.Empty(t, lookup.GetTypeByPath([]string{"system", "memory", "state"}))
require.Empty(t, lookup.GetTypeByPath([]string{"blah", "blah", "state"}))
}

0 comments on commit 3e8feb8

Please sign in to comment.