From 8b9af561d1e202590e1938343b37a0bdf0250ecc Mon Sep 17 00:00:00 2001 From: Rakshith R Date: Mon, 9 Oct 2023 11:47:06 +0530 Subject: [PATCH] cephfs: safeguard localClusterState struct members from race condition Multiple go-routines may simultaneously check for presence of a clusterID's metadata such as subvolumegroup created state, resize state, metadata state and snapshot metadata state in the clusterAdditionalInfo map and update an entry after creation if it is absent. This set of operation needs to be serialized. Therefore, this commit safeguards localClusterState's members with a RWMutex to prevent the above problem. Signed-off-by: Rakshith R --- internal/cephfs/core/metadata.go | 55 +++++++++++++++++++++-- internal/cephfs/core/snapshot_metadata.go | 11 +++-- internal/cephfs/core/volume.go | 31 ++++++++----- 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f468..f7f85591e039 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -34,23 +34,72 @@ var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations ar func (s *subVolumeClient) supportsSubVolMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState != unsupported } func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to the caller. - clusterAdditionalInfo[s.clusterID].subVolMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolMetadataState.operationState = supported return true } +// isNotSupportedResize returns true if resize is not supported. +func (s *subVolumeClient) isNotSupportedResize() bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].resizeState.RLock() + defer clusterAdditionalInfo[s.clusterID].resizeState.RUnlock() + + return clusterAdditionalInfo[s.clusterID].resizeState.operationState == unknown || + clusterAdditionalInfo[s.clusterID].resizeState.operationState == unsupported +} + +// updateResizeState updates resize state. +func (s *subVolumeClient) updateResizeState(state operationState) { + clusterAdditionalInfo[s.clusterID].resizeState.Lock() + defer clusterAdditionalInfo[s.clusterID].resizeState.Unlock() + + clusterAdditionalInfo[s.clusterID].resizeState.operationState = state +} + +// isSubVolumeGroupCreated returns true if subvolume group is created. +func (s *subVolumeClient) isSubVolumeGroupCreated(group string) bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + return false + } + + return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] +} + +// updateSubVolumeGroupCreated updates subvolume group created map. +// If the map is nil, it creates a new map. +func (s *subVolumeClient) updateSubVolumeGroupCreated(group string, state bool) { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool) + } + + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[group] = state +} + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { diff --git a/internal/cephfs/core/snapshot_metadata.go b/internal/cephfs/core/snapshot_metadata.go index f168fbf8cd41..123a4eac892c 100644 --- a/internal/cephfs/core/snapshot_metadata.go +++ b/internal/cephfs/core/snapshot_metadata.go @@ -30,20 +30,25 @@ var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata func (s *snapshotClient) supportsSubVolSnapMetadata() bool { newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.RUnlock() - return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported + return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState != unsupported } func (s *snapshotClient) isUnsupportedSubVolSnapMetadata(err error) bool { + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.Unlock() + var invalid fsAdmin.NotImplementedError if err != nil && errors.As(err, &invalid) { // In case the error is other than invalid command return error to // the caller. - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = unsupported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = unsupported return false } - clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState = supported + clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState.operationState = supported return true } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 54c3dd4eb216..3d3470fb532a 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -196,7 +196,12 @@ func (s *subVolumeClient) GetSubVolumeInfo(ctx context.Context) (*Subvolume, err return &subvol, nil } -type operationState int64 +type operationState int32 + +type operationStateMutex struct { + sync.RWMutex + operationState operationState +} const ( unknown operationState = iota @@ -207,14 +212,17 @@ const ( type localClusterState struct { // set the enum value i.e., unknown, supported, // unsupported as per the state of the cluster. - resizeState operationState - subVolMetadataState operationState - subVolSnapshotMetadataState operationState + resizeState operationStateMutex + subVolMetadataState operationStateMutex + subVolSnapshotMetadataState operationStateMutex // A cluster can have multiple filesystem for that we need to have a map of // subvolumegroups to check filesystem is created nor not. // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. subVolumeGroupsCreated map[string]bool + // subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map + // against concurrent writes while allowing multiple readers. + subVolumeGroupsRWMutex sync.RWMutex } func newLocalClusterState(clusterID string) { @@ -224,7 +232,6 @@ func newLocalClusterState(clusterID string) { defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} - clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) } } @@ -240,7 +247,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if !s.isSubVolumeGroupCreated(s.SubvolumeGroup) { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -254,7 +261,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, true) } opts := fsAdmin.SubVolumeOptions{ @@ -272,7 +279,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + s.updateSubVolumeGroupCreated(s.SubvolumeGroup, false) } return err @@ -303,10 +310,10 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er // CreateVolume to resize the subvolume. func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { newLocalClusterState(s.clusterID) + // resize subvolume when either it's supported, or when corresponding // clusterID key was not present. - if clusterAdditionalInfo[s.clusterID].resizeState == unknown || - clusterAdditionalInfo[s.clusterID].resizeState == supported { + if s.isNotSupportedResize() { fsa, err := s.conn.GetFSAdmin() if err != nil { log.ErrorLog(ctx, "could not get FSAdmin, can not resize volume %s:", s.FsName, err) @@ -315,7 +322,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er } _, err = fsa.ResizeSubVolume(s.FsName, s.SubvolumeGroup, s.VolID, fsAdmin.ByteCount(bytesQuota), true) if err == nil { - clusterAdditionalInfo[s.clusterID].resizeState = supported + s.updateResizeState(supported) return nil } @@ -327,7 +334,7 @@ func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) er return err } } - clusterAdditionalInfo[s.clusterID].resizeState = unsupported + s.updateResizeState(unsupported) s.Size = bytesQuota return s.CreateVolume(ctx)