Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Disk Manager] improve incremental snapshots/images tests; bug fixes #1980

Merged
merged 8 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions cloud/disk_manager/internal/pkg/dataplane/delete_snapshot_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/clients/nbs"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/protos"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
Expand All @@ -15,9 +16,10 @@ import (
////////////////////////////////////////////////////////////////////////////////

type deleteSnapshotTask struct {
storage storage.Storage
request *protos.DeleteSnapshotRequest
state *protos.DeleteSnapshotTaskState
storage storage.Storage
nbsFactory nbs.Factory
request *protos.DeleteSnapshotRequest
state *protos.DeleteSnapshotTaskState
}

func (t *deleteSnapshotTask) Save() ([]byte, error) {
Expand Down Expand Up @@ -45,6 +47,22 @@ func (t *deleteSnapshotTask) deletingSnapshot(
return err
}

if snapshotMeta.Disk != nil {
nbsClient, err := t.nbsFactory.GetClient(ctx, snapshotMeta.Disk.ZoneId)
if err != nil {
return err
}

err = nbsClient.DeleteCheckpoint(
ctx,
snapshotMeta.Disk.DiskId,
snapshotMeta.CheckpointID,
)
if err != nil {
return err
}
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
Expand Down
3 changes: 2 additions & 1 deletion cloud/disk_manager/internal/pkg/dataplane/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func RegisterForExecution(

err = taskRegistry.RegisterForExecution("dataplane.DeleteSnapshot", func() tasks.Task {
return &deleteSnapshotTask{
storage: storage,
storage: storage,
nbsFactory: nbsFactory,
}
})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (s *snapshotState) toSnapshotMeta() *SnapshotMeta {
BaseCheckpointID: s.baseCheckpointID,
Size: s.size,
StorageSize: s.storageSize,
LockTaskID: s.lockTaskID,
ChunkCount: s.chunkCount,
Encryption: &types.EncryptionDesc{
Mode: types.EncryptionMode(s.encryptionMode),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ func (s *StorageMock) UnlockSnapshot(
return args.Error(0)
}

func (s *StorageMock) GetSnapshotMeta(
ctx context.Context,
snapshotID string,
) (*storage.SnapshotMeta, error) {

args := s.Called(ctx, snapshotID)
return args.Get(0).(*storage.SnapshotMeta), args.Error(1)
}

////////////////////////////////////////////////////////////////////////////////

func NewStorageMock() *StorageMock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SnapshotMeta struct {
Size uint64
// Snapshot real size, i.e. the amount of disk space occupied in storage.
StorageSize uint64
LockTaskID string
ChunkCount uint32
Encryption *types.EncryptionDesc
Ready bool
Expand Down Expand Up @@ -132,4 +133,9 @@ type Storage interface {
) (locked bool, err error)

UnlockSnapshot(ctx context.Context, snapshotID string, lockTaskID string) error

GetSnapshotMeta(
ctx context.Context,
snapshotID string,
) (*SnapshotMeta, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,11 @@ func (s *legacyStorage) UnlockSnapshot(

return task_errors.NewNonRetriableErrorf("not implemented")
}

func (s *legacyStorage) GetSnapshotMeta(
ctx context.Context,
snapshotID string,
) (*SnapshotMeta, error) {

return nil, task_errors.NewNonRetriableErrorf("not implemented")
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,25 @@ func (s *storageYDB) UnlockSnapshot(
},
)
}

func (s *storageYDB) GetSnapshotMeta(
ctx context.Context,
snapshotID string,
) (*SnapshotMeta, error) {

var snapshotMeta *SnapshotMeta

err := s.db.Execute(
ctx,
func(ctx context.Context, session *persistence.Session) error {
var err error
snapshotMeta, err = s.getSnapshotMeta(
ctx,
session,
snapshotID,
)
return err
},
)
return snapshotMeta, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ func (s *storageYDB) deletingSnapshot(

if len(states) != 0 {
state = states[0]
logging.Info(ctx, "Deleting snapshot %+v", *state.toSnapshotMeta())

if state.status >= snapshotStatusDeleting {
// Snapshot already marked as deleting.
Expand Down Expand Up @@ -1357,3 +1358,40 @@ func (s *storageYDB) unlockSnapshot(
logging.Info(ctx, "Unlocked snapshot with id %v", snapshotID)
return nil
}

func (s *storageYDB) getSnapshotMeta(
ctx context.Context,
session *persistence.Session,
snapshotID string,
) (*SnapshotMeta, error) {

res, err := session.ExecuteRO(ctx, fmt.Sprintf(`
--!syntax_v1
pragma TablePathPrefix = "%v";
declare $id as Utf8;

select *
from snapshots
where id = $id
`, s.tablesPath),
persistence.ValueParam("$id", persistence.UTF8Value(snapshotID)),
)
if err != nil {
return nil, err
}
defer res.Close()

states, err := scanSnapshotStates(ctx, res)
if err != nil {
return nil, err
}

if len(states) == 0 {
return nil, task_errors.NewNonRetriableErrorf(
"snapshot with id %v does not exist",
snapshotID,
)
}

return states[0].toSnapshotMeta(), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,34 @@ func testCases() []differentChunkStorageTestCase {

////////////////////////////////////////////////////////////////////////////////

func checkBaseSnapshot(
t *testing.T,
ctx context.Context,
storage Storage,
snapshotID string,
expectedBaseSnapshotID string,
) {

snapshotMeta, err := storage.GetSnapshotMeta(ctx, snapshotID)
require.NoError(t, err)
require.EqualValues(t, expectedBaseSnapshotID, snapshotMeta.BaseSnapshotID)
}

func checkLockTaskID(
t *testing.T,
ctx context.Context,
storage Storage,
snapshotID string,
expectedLockTaskID string,
) {

snapshotMeta, err := storage.GetSnapshotMeta(ctx, snapshotID)
require.NoError(t, err)
require.EqualValues(t, expectedLockTaskID, snapshotMeta.LockTaskID)
}

////////////////////////////////////////////////////////////////////////////////

func TestCreateSnapshot(t *testing.T) {
for _, testCase := range testCases() {
t.Run(testCase.name, func(t *testing.T) {
Expand Down Expand Up @@ -515,6 +543,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) {
require.NotNil(t, created)
require.Equal(t, snapshot1.ID, created.BaseSnapshotID)
require.Equal(t, snapshot1.CheckpointID, created.BaseCheckpointID)
checkBaseSnapshot(t, f.ctx, f.storage, snapshot3.ID, snapshot1.ID)

err = f.storage.SnapshotCreated(f.ctx, snapshot3.ID, 0, 0, 0, nil)
require.NoError(t, err)
Expand All @@ -534,6 +563,7 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) {
require.NotNil(t, created)
require.Equal(t, snapshot3.ID, created.BaseSnapshotID)
require.Equal(t, snapshot3.CheckpointID, created.BaseCheckpointID)
checkBaseSnapshot(t, f.ctx, f.storage, snapshot4.ID, snapshot3.ID)

err = f.storage.SnapshotCreated(f.ctx, snapshot4.ID, 0, 0, 0, nil)
require.NoError(t, err)
Expand All @@ -550,6 +580,93 @@ func TestSnapshotsCreateIncrementalSnapshot(t *testing.T) {
}
}

func TestSnapshotsLocks(t *testing.T) {
for _, testCase := range testCases() {
t.Run(testCase.name, func(t *testing.T) {
f := createFixture(t)
defer f.teardown()

snapshot1 := SnapshotMeta{
ID: "snapshot1",
Disk: &types.Disk{
ZoneId: "zone",
DiskId: "disk",
},
CheckpointID: "checkpoint1",
CreateTaskID: "create1",
}

created, err := f.storage.CreateSnapshot(f.ctx, snapshot1)
require.NoError(t, err)
require.NotNil(t, created)

err = f.storage.SnapshotCreated(f.ctx, snapshot1.ID, 0, 0, 0, nil)
require.NoError(t, err)

snapshot2 := SnapshotMeta{
ID: "snapshot2",
Disk: &types.Disk{
ZoneId: "zone",
DiskId: "disk",
},
CheckpointID: "checkpoint2",
CreateTaskID: "create2",
}

created, err = f.storage.CreateSnapshot(f.ctx, snapshot2)
require.NoError(t, err)
require.NotNil(t, created)

locked, err := f.storage.LockSnapshot(f.ctx, snapshot1.ID, created.CreateTaskID)
require.NoError(t, err)
require.True(t, locked)
checkLockTaskID(t, f.ctx, f.storage, snapshot1.ID, created.CreateTaskID)

err = f.storage.SnapshotCreated(f.ctx, snapshot2.ID, 0, 0, 0, nil)
require.NoError(t, err)

err = f.storage.UnlockSnapshot(f.ctx, snapshot1.ID, created.CreateTaskID)
require.NoError(t, err)
checkLockTaskID(t, f.ctx, f.storage, snapshot1.ID, "")

_, err = f.storage.DeletingSnapshot(f.ctx, snapshot2.ID, created.CreateTaskID)
require.NoError(t, err)

snapshot3 := SnapshotMeta{
ID: "snapshot3",
Disk: &types.Disk{
ZoneId: "zone",
DiskId: "disk",
},
CheckpointID: "checkpoint3",
CreateTaskID: "create3",
}

created, err = f.storage.CreateSnapshot(f.ctx, snapshot3)
require.NoError(t, err)
require.NotNil(t, created)

locked, err = f.storage.LockSnapshot(f.ctx, snapshot2.ID, created.CreateTaskID)
require.NoError(t, err)
require.False(t, locked)
checkLockTaskID(t, f.ctx, f.storage, snapshot2.ID, "")

err = f.storage.SnapshotCreated(f.ctx, snapshot3.ID, 0, 0, 0, nil)
require.NoError(t, err)

err = f.storage.UnlockSnapshot(f.ctx, snapshot2.ID, created.CreateTaskID)
require.NoError(t, err)
checkLockTaskID(t, f.ctx, f.storage, snapshot2.ID, "")

_, err = f.storage.DeletingSnapshot(f.ctx, snapshot3.ID, "delete3")
require.NoError(t, err)

_, err = f.storage.DeletingSnapshot(f.ctx, snapshot1.ID, "delete1")
require.NoError(t, err)
})
}
}

func TestDeletingSnapshot(t *testing.T) {
for _, testCase := range testCases() {
t.Run(testCase.name, func(t *testing.T) {
Expand Down
Loading
Loading