Skip to content

Commit

Permalink
Improvements to the connection state locking procedure to help ensure…
Browse files Browse the repository at this point in the history
… that locks are cleaned up when released or disconnected.
  • Loading branch information
colinmcintosh committed Sep 2, 2021
1 parent 95fa73f commit 2950e6c
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 22 deletions.
15 changes: 11 additions & 4 deletions gateway/connections/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/go-zookeeper/zk"
"github.com/openconfig/gnmi/errlist"
"sync"
"time"
Expand Down Expand Up @@ -236,22 +237,27 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) {
var connectionSlotAcquired = false
for !t.stopped {
if !connectionSlotAcquired {
t.config.Log.Info().Msgf("Target %s: Acquiring connection slot", t.name)
connectionSlotAcquired = connectionSlot.TryAcquire(1)
}
if connectionSlotAcquired {
if !t.ConnectionLockAcquired {
t.config.Log.Info().Msgf("Target %s: Acquiring lock", t.name)
var err error
t.ConnectionLockAcquired, err = t.lock.Try()
if err != nil {
t.config.Log.Error().Msgf("error while trying to acquire lock: %v", err)
t.config.Log.Error().Msgf("Target %s: error while trying to acquire lock: %v", t.name, err)
time.Sleep(2 * time.Second)
}
}
if t.ConnectionLockAcquired {
t.config.Log.Info().Msgf("Target %s: Lock acquired", t.name)
t.doConnect()
err := t.lock.Unlock()
if err != nil {
t.config.Log.Warn().Err(err).Msgf("Target %s: error while releasing lock: %v", t.name, err)
if t.lock.LockAcquired() {
err := t.lock.Unlock()
if err != nil && err != zk.ErrNotLocked {
t.config.Log.Error().Msgf("Target %s: error while releasing lock: %v", t.name, err)
}
}
t.ConnectionLockAcquired = false
t.config.Log.Info().Msgf("Target %s: Lock released", t.name)
Expand All @@ -260,6 +266,7 @@ func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) {
}
}
}
t.config.Log.Info().Msgf("Target %s: Stopped", t.name)
if connectionSlotAcquired {
connectionSlot.Release(1)
}
Expand Down
4 changes: 3 additions & 1 deletion gateway/connections/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (c *ZookeeperConnectionManager) eventListener(zkEvents <-chan zk.Event) {
for _, targetConfig := range c.connections {
if targetConfig.useLock {
err := targetConfig.unlock()
c.config.Log.Error().Msgf("error while unlocking target: %v", err)
if err != nil {
c.config.Log.Error().Msgf("error while unlocking target: %v", err)
}
}
}
c.connectionsMutex.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (g *Gateway) zookeeperEventHandler(zkEventChan <-chan zk.Event) {
switch event.State {
case zk.StateDisconnected:
disconnectedCount++
if disconnectedCount > 5 {
if disconnectedCount > 50 {
panic("too many Zookeeper disconnects")
}
case zk.StateHasSession:
Expand Down
4 changes: 4 additions & 0 deletions gateway/locking/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func NewNonBlockingLock(id string, member string) DistributedLocker {
}
}

func (l *NonBlockingLock) LockAcquired() bool {
return l.acquired
}

func (l *NonBlockingLock) GetMember(id string) (string, error) {
val, exists := registry.Load(id)
if exists {
Expand Down
6 changes: 4 additions & 2 deletions gateway/locking/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ package locking
// DistributedLocker is an interface for creating non-blocking locks
// among distributed processes.
type DistributedLocker interface {
// LockAcquired returns true if the lock is currently acquired.
LockAcquired() bool
// Try to acquire the lock. If the lock is already acquired return true and
// a deadlock error.
Try() (bool, error)
// Unlock the lock.
Unlock() error
// Return the ID for this lock.
// ID returns the ID or lock path for this lock.
ID() string
// Get the member that currently has the lock for the ID, if it's currently
// GetMember gets the member that currently has the lock for the provided ID, if it's currently
// locked, otherwise return an empty string.
GetMember(id string) (string, error)
}
40 changes: 26 additions & 14 deletions gateway/locking/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,27 @@ package locking

import (
"fmt"
"github.com/rs/zerolog/log"
"strconv"
"strings"
"sync"
"time"

"github.com/go-zookeeper/zk"
)

var _ DistributedLocker = new(ZookeeperNonBlockingLock)

type ZookeeperNonBlockingLock struct {
acquired bool
conn *zk.Conn
// The member that is holding the lock. This is usually the address and port where the cluster member is reachable.
member string
id string
acl []zk.ACL
lockPath string
seq int
member string
id string
acl []zk.ACL
lockPath string
seq int
unlockMutex sync.Mutex
}

// NewZookeeperNonBlockingLock creates a new lock instance using the provided connection, path, and acl.
Expand All @@ -77,6 +82,10 @@ func NewZookeeperNonBlockingLock(conn *zk.Conn, id string, member string, acl []
}
}

func (l *ZookeeperNonBlockingLock) LockAcquired() bool {
return l.acquired
}

func GetMember(conn *zk.Conn, id string) (string, error) {
trimmedID := "/" + strings.Trim(id, "/")
_, lowestSeqPath, err := lowestSeqChild(conn, trimmedID)
Expand Down Expand Up @@ -229,33 +238,36 @@ func (l *ZookeeperNonBlockingLock) lowestSeqChild(path string) (int, string, err

func (l *ZookeeperNonBlockingLock) watchState() {
currentState := l.conn.State()
for l.acquired && (currentState == zk.StateConnected || currentState == zk.StateHasSession) {
for l.LockAcquired() && (currentState == zk.StateConnected || currentState == zk.StateHasSession) {
time.Sleep(500 * time.Millisecond)
currentState = l.conn.State()
}
// disconnected
if l.acquired {
l.released()
// zk is disconnected
if l.LockAcquired() {
err := l.Unlock()
if err != nil {
log.Error().Msgf("watchState: unable to unlock zookeeper lock: %v", err)
}
}
}

// Unlock releases an acquired lock. If the lock is not currently acquired by
// this Lock instance than ErrNotLocked is returned.
// This should only be called if we're still connected.
func (l *ZookeeperNonBlockingLock) Unlock() error {
l.unlockMutex.Lock()
defer l.unlockMutex.Unlock()
if l.lockPath == "" {
return zk.ErrNotLocked
}
if err := l.conn.Delete(l.lockPath, -1); err != nil {
return fmt.Errorf("unable to release lock gracefully: %s", err)
}
l.released()
//log.Info().Msg("Cluster lock released.")
return nil
}

func (l *ZookeeperNonBlockingLock) released() {
// reset the lock internals
l.lockPath = ""
l.seq = 0
l.acquired = false
log.Info().Msg("Cluster lock released.")
return nil
}

0 comments on commit 2950e6c

Please sign in to comment.