Skip to content

Commit

Permalink
split lock
Browse files Browse the repository at this point in the history
  • Loading branch information
korotkov-aerospike committed Nov 3, 2024
1 parent 158fad9 commit b287218
Showing 1 changed file with 50 additions and 15 deletions.
65 changes: 50 additions & 15 deletions pkg/service/client_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,51 @@ func NewClientManager(aerospikeClientFactory AerospikeClientFactory, closeDelay

// GetClient returns a backup client by aerospike cluster name (new or cached).
func (cm *ClientManagerImpl) GetClient(cluster *model.AerospikeCluster) (*backup.Client, error) {
if client := cm.getExistingClient(cluster); client != nil {
return client, nil
}

client, err := cm.createClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot create backup client: %w", err)
}

return cm.storeClient(cluster, client), nil
}

// getExistingClient tries to get an existing client from the cache.
// Returns nil if client doesn't exist.
func (cm *ClientManagerImpl) getExistingClient(cluster *model.AerospikeCluster) *backup.Client {
cm.mu.Lock()
defer cm.mu.Unlock()

if info, exists := cm.clients[cluster]; exists {
// If there's a pending close operation, cancel it by stopping the timer
if info.closeTimer != nil {
info.closeTimer.Stop()
info.closeTimer = nil
}
info.count++
return info.client, nil
cm.incrementRef(info)
return info.client
}

client, err := cm.createClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot create backup client: %w", err)
return nil
}

// storeClient attempts to store the client in the cache.
func (cm *ClientManagerImpl) storeClient(cluster *model.AerospikeCluster, client *backup.Client) *backup.Client {
cm.mu.Lock()
defer cm.mu.Unlock()

// If another client was created concurrently,
// closes the provided client and returns the existing one.
if info, exists := cm.clients[cluster]; exists {
client.AerospikeClient().Close()
cm.incrementRef(info)
return info.client
}

cm.clients[cluster] = &clientInfo{
client: client,
count: 1,
}

return client, nil
return client
}

// createClient creates a new backup client given the aerospike cluster configuration.
Expand Down Expand Up @@ -113,10 +134,7 @@ func (cm *ClientManagerImpl) Close(client *backup.Client) {

for id, info := range cm.clients {
if info.client == client {
info.count--
if info.count == 0 {
info.closeTimer = cm.scheduleClosing(id)
}
cm.decrementRef(info, id)
return
}
}
Expand All @@ -125,6 +143,23 @@ func (cm *ClientManagerImpl) Close(client *backup.Client) {
client.AerospikeClient().Close()
}

// incrementRef increases the reference count and cancels any pending close operation.
func (cm *ClientManagerImpl) incrementRef(info *clientInfo) {
if info.closeTimer != nil {
info.closeTimer.Stop()
info.closeTimer = nil
}
info.count++
}

// decrementRef decreases the reference count and schedules closing if count reaches zero.
func (cm *ClientManagerImpl) decrementRef(info *clientInfo, cluster *model.AerospikeCluster) {
info.count--
if info.count == 0 {
info.closeTimer = cm.scheduleClosing(cluster)
}
}

// scheduleClosing schedules client closing after the configured delay.
// Returns a timer that can be used to cancel the scheduled closing if needed.
func (cm *ClientManagerImpl) scheduleClosing(cluster *model.AerospikeCluster) *time.Timer {
Expand Down

0 comments on commit b287218

Please sign in to comment.