Skip to content

Commit

Permalink
split lock
Browse files Browse the repository at this point in the history
  • Loading branch information
korotkov-aerospike committed Nov 3, 2024
1 parent ea03b6e commit f939431
Showing 1 changed file with 42 additions and 21 deletions.
63 changes: 42 additions & 21 deletions pkg/service/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package service

import (
"fmt"
"log/slog"
"sync"
"time"

Expand Down Expand Up @@ -61,34 +60,59 @@ func NewClientManager(aerospikeClientFactory AerospikeClientFactory, closeDelay

// GetClient returns a backup client by aerospike cluster name (new or cached).
func (cm *ClientManagerImpl) GetClient(cluster *model.AerospikeCluster) (*backup.Client, error) {
slog.Info("NS: Request client before lock")
if client := cm.getExistingClient(cluster); client != nil {
return client, nil
}

client, err := cm.createClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot create backup client: %w", err)
}

return cm.storeClient(cluster, client), nil
}

// getExistingClient tries to get an existing client from the cache.
// Returns nil if client doesn't exist.
func (cm *ClientManagerImpl) getExistingClient(cluster *model.AerospikeCluster) *backup.Client {
cm.mu.Lock()
defer cm.mu.Unlock()
slog.Info("NS: in lock")

if info, exists := cm.clients[cluster]; exists {
// If there's a pending close operation, cancel it by stopping the timer
if info.closeTimer != nil {
info.closeTimer.Stop()
info.closeTimer = nil
}
cm.cancelScheduledClosing(info)
info.count++
slog.Info("NS: Got client from cache")
return info.client, nil
return info.client
}

slog.Info("NS: Creating new client")
client, err := cm.createClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot create backup client: %w", err)
return nil
}

// cancelScheduledClosing cancels any pending close operation for the client.
func (cm *ClientManagerImpl) cancelScheduledClosing(info *clientInfo) {
if info.closeTimer != nil {
info.closeTimer.Stop()
info.closeTimer = nil
}
}

// storeClient attempts to store the client in the cache.
// If another client was created concurrently, closes the provided client and returns the existing one.
func (cm *ClientManagerImpl) storeClient(cluster *model.AerospikeCluster, client *backup.Client) *backup.Client {
cm.mu.Lock()
defer cm.mu.Unlock()

// Check if another goroutine created the client
if info, exists := cm.clients[cluster]; exists {
client.AerospikeClient().Close()
return info.client
}
slog.Info("NS: Created new client", slog.Int("clients", len(cm.clients)))

cm.clients[cluster] = &clientInfo{
client: client,
count: 1,
}

return client, nil
return client
}

// createClient creates a new backup client given the aerospike cluster configuration.
Expand Down Expand Up @@ -118,11 +142,9 @@ func (cm *ClientManagerImpl) Close(client *backup.Client) {

for id, info := range cm.clients {
if info.client == client {
slog.Info("Decreasing client reference counter")
info.count--
if info.count == 0 {
slog.Info("Scheduling client closing")
info.closeTimer = cm.scheduleClosing(id)
info.closeTimer = cm.scheduleClosing(info, id)
}
return
}
Expand All @@ -134,7 +156,7 @@ func (cm *ClientManagerImpl) Close(client *backup.Client) {

// scheduleClosing schedules client closing after the configured delay.
// Returns a timer that can be used to cancel the scheduled closing if needed.
func (cm *ClientManagerImpl) scheduleClosing(cluster *model.AerospikeCluster) *time.Timer {
func (cm *ClientManagerImpl) scheduleClosing(info *clientInfo, cluster *model.AerospikeCluster) *time.Timer {

Check failure on line 159 in pkg/service/client_manager.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'info' seems to be unused, consider removing or renaming it as _ (revive)
return time.AfterFunc(cm.closeDelay, func() {
cm.mu.Lock()
defer cm.mu.Unlock()
Expand All @@ -143,7 +165,6 @@ func (cm *ClientManagerImpl) scheduleClosing(cluster *model.AerospikeCluster) *t
if info, exists := cm.clients[cluster]; exists && info.count == 0 {
info.client.AerospikeClient().Close()
delete(cm.clients, cluster)
slog.Info("Closed unused client", slog.Int("clients", len(cm.clients)))
}
})
}

0 comments on commit f939431

Please sign in to comment.