Skip to content

Commit

Permalink
APPS-1325 Delete failed backups (#247)
Browse files Browse the repository at this point in the history
* delete files on failed backup

* add logs

* add logs

* add real check for avaiablity

* add real check for avaiablity

* add tests

* label

* misprint
  • Loading branch information
korotkov-aerospike authored Nov 5, 2024
1 parent 5b787d0 commit 28fb208
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
22 changes: 14 additions & 8 deletions pkg/service/backup_routine_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti
return err
}

err = h.waitForFullBackups(ctx, now)
err = h.waitForFullBackups(ctx, now, logger)
if err != nil {
return err
}
Expand All @@ -123,7 +123,7 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti
h.deleteFolder(ctx, h.backend.incrementalBackupsPath, logger)
}

h.writeClusterConfiguration(ctx, client.AerospikeClient(), now)
h.writeClusterConfiguration(ctx, client.AerospikeClient(), now, logger)
return nil
}

Expand Down Expand Up @@ -157,30 +157,36 @@ func (h *BackupRoutineHandler) startFullBackupForAllNamespaces(
return nil
}

func (h *BackupRoutineHandler) waitForFullBackups(ctx context.Context, backupTimestamp time.Time) error {
func (h *BackupRoutineHandler) waitForFullBackups(
ctx context.Context, backupTimestamp time.Time, logger *slog.Logger,
) error {
startTime := time.Now() // startTime is only used to measure backup time
for namespace, handler := range h.fullBackupHandlers {
backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, backupTimestamp)
err := handler.Wait(ctx)
if err != nil {
backupFailureCounter.Inc()
slog.Info("Delete failed backup folder",
slog.String("path", backupFolder),
)
h.deleteFolder(ctx, backupFolder, logger) // cleanup on failure
return fmt.Errorf("error during backup namespace %s, routine %s: %w",
namespace, h.routineName, err)
}

backupFolder := getFullPath(h.backend.fullBackupsPath, h.backupFullPolicy, namespace, backupTimestamp)
if err := h.writeBackupMetadata(ctx, handler.GetStats(), backupTimestamp, namespace, backupFolder); err != nil {
return err
}
}
backupDurationGauge.Set(float64(time.Since(startTime).Milliseconds()))
duration := float64(time.Since(startTime).Milliseconds())
logger.Debug("Finished full backup", slog.Float64("duration_ms", duration))
backupDurationGauge.Set(duration)
return nil
}

func (h *BackupRoutineHandler) writeClusterConfiguration(
ctx context.Context, client backup.AerospikeClient, now time.Time,
ctx context.Context, client backup.AerospikeClient, now time.Time, logger *slog.Logger,
) {
logger := slog.Default().With(slog.String("routine", h.routineName))

infos := getClusterConfiguration(client)
if len(infos) == 0 {
logger.Warn("Could not read aerospike configuration")
Expand Down
44 changes: 37 additions & 7 deletions pkg/service/client_manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package service

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -19,9 +20,10 @@ type ClientManager interface {
Close(*backup.Client)
}

// AerospikeClientFactory defines an interface for creating new clients.
// AerospikeClientFactory defines an interface for creating and checking clients.
type AerospikeClientFactory interface {
NewClientWithPolicyAndHost(policy *as.ClientPolicy, hosts ...*as.Host) (backup.AerospikeClient, error)
IsClusterHealthy(client backup.AerospikeClient) bool
}

// DefaultClientFactory is the default implementation of AerospikeClientFactory.
Expand All @@ -34,6 +36,26 @@ func (f *DefaultClientFactory) NewClientWithPolicyAndHost(
return as.NewClientWithPolicyAndHost(policy, hosts...)
}

// IsClusterHealthy checks if the cluster is connected and responding.
func (f *DefaultClientFactory) IsClusterHealthy(client backup.AerospikeClient) bool {
if client == nil {
return false
}

cluster := client.Cluster()
if !cluster.IsConnected() {
return false
}

node, err := cluster.GetRandomNode()
if err != nil {
return false
}

info, err := node.RequestInfo(nil, "status")
return err == nil && info["status"] == "ok"
}

// ClientManagerImpl implements [ClientManager].
// Is responsible for creating and closing backup clients.
type ClientManagerImpl struct {
Expand All @@ -60,11 +82,15 @@ func NewClientManager(aerospikeClientFactory AerospikeClientFactory, closeDelay

// GetClient returns a backup client by aerospike cluster name (new or cached).
func (cm *ClientManagerImpl) GetClient(cluster *model.AerospikeCluster) (*backup.Client, error) {
if client := cm.getExistingClient(cluster); client != nil {
client, err := cm.getExistingClient(cluster)
if err != nil {
return nil, err
}
if client != nil {
return client, nil
}

client, err := cm.createClient(cluster)
client, err = cm.createClient(cluster)
if err != nil {
return nil, fmt.Errorf("cannot create backup client: %w", err)
}
Expand All @@ -73,17 +99,21 @@ func (cm *ClientManagerImpl) GetClient(cluster *model.AerospikeCluster) (*backup
}

// getExistingClient tries to get an existing client from the cache.
// Returns nil if client doesn't exist.
func (cm *ClientManagerImpl) getExistingClient(cluster *model.AerospikeCluster) *backup.Client {
// Returns nil if client doesn't exist, error if client is not connected.
func (cm *ClientManagerImpl) getExistingClient(cluster *model.AerospikeCluster) (*backup.Client, error) {
cm.mu.RLock()
defer cm.mu.RUnlock()

if info, exists := cm.clients[cluster]; exists {
if !cm.clientFactory.IsClusterHealthy(info.client.AerospikeClient()) {
return nil, errors.New("aerospike cluster connection lost")
}

cm.incrementRef(info)
return info.client
return info.client, nil
}

return nil
return nil, nil
}

// storeClient attempts to store the client in the cache.
Expand Down
34 changes: 27 additions & 7 deletions pkg/service/client_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/aerospike/backup-go/mocks"
"github.com/aws/smithy-go/ptr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// MockClientFactory is a mock implementation of the AerospikeClientFactory interface.
type MockClientFactory struct {
ShouldFail bool
ShouldFail bool
IsClusterDisconnected bool
}

var cluster = &model.AerospikeCluster{
Expand All @@ -33,12 +35,8 @@ func (f *MockClientFactory) NewClientWithPolicyAndHost(_ *as.ClientPolicy, _ ...
return m, nil
}

func assertClientExists(t *testing.T, clientManager *ClientManagerImpl, cl *model.AerospikeCluster, shouldExist bool) {
t.Helper()
clientManager.mu.Lock()
defer clientManager.mu.Unlock()
_, exists := clientManager.clients[cl]
assert.Equal(t, shouldExist, exists)
func (f *MockClientFactory) IsClusterHealthy(_ backup.AerospikeClient) bool {
return !f.IsClusterDisconnected
}

func Test_GetClient(t *testing.T) {
Expand All @@ -59,6 +57,20 @@ func Test_GetClient(t *testing.T) {
assert.Equal(t, client, client2)
}

func Test_GetClient_UnhealthyConnection(t *testing.T) {
clientManager := NewClientManager(
&MockClientFactory{IsClusterDisconnected: true},
10*time.Second,
)

_, _ = clientManager.GetClient(cluster)
// Try to get client - should fail due to unhealthy connection
client, err := clientManager.GetClient(cluster)
require.Error(t, err)
assert.Contains(t, err.Error(), "aerospike cluster connection lost")
assert.Nil(t, client)
}

func Test_CreateClient(t *testing.T) {
clientManager := NewClientManager(
&MockClientFactory{},
Expand Down Expand Up @@ -158,3 +170,11 @@ func Test_Close_NotExisting(t *testing.T) {

aeroClient.AssertExpectations(t)
}

func assertClientExists(t *testing.T, clientManager *ClientManagerImpl, cl *model.AerospikeCluster, shouldExist bool) {
t.Helper()
clientManager.mu.Lock()
defer clientManager.mu.Unlock()
_, exists := clientManager.clients[cl]
assert.Equal(t, shouldExist, exists)
}
1 change: 1 addition & 0 deletions pkg/service/retry_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (r *RetryService) retry(f func() error, retryInterval time.Duration, n int3

logger.Info("Execution failed, retry scheduled",
slog.Any("retryInterval", retryInterval),
slog.Any("attempts", n-1),
slog.Any("err", err))

r.timer = time.AfterFunc(retryInterval, func() {
Expand Down

0 comments on commit 28fb208

Please sign in to comment.