Skip to content

Commit

Permalink
Makes two important stage area updates:
Browse files Browse the repository at this point in the history
- Alters the logic for checking if a file or part was already received: if the hash or "new" name changed, we need to receive it again
- Adds an I/O lock to make sure that Recover() and pruneTree() won't ever mess with any other function doing filesystem I/O.
  • Loading branch information
sbeus committed Mar 4, 2024
1 parent 0b8606d commit 983870b
Showing 1 changed file with 73 additions and 41 deletions.
114 changes: 73 additions & 41 deletions stage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ type Stage struct {
cache map[string]*finalFile
cacheTime time.Time
cacheTimes []time.Time
writeLock sync.RWMutex
writeLocks map[string]*sync.RWMutex
readyLock sync.RWMutex
cleanLock sync.RWMutex

// Every normal file I/O function does a read lock, so write locking
// it will block the stage area from interacting with the file system.
// This is useful for when we want to clean up the stage and target
// areas.
ioLock sync.RWMutex

pathLock sync.RWMutex
pathLocks map[string]*sync.RWMutex
readyLock sync.RWMutex
cleanLock sync.RWMutex
}

// New creates a new instance of Stage where the rootDir is the directory
Expand All @@ -121,43 +128,43 @@ func New(name, rootDir, targetDir string, logger sts.ReceiveLogger, dispatcher s
}
s.wait = make(map[string][]*finalFile)
s.cache = make(map[string]*finalFile)
s.writeLocks = make(map[string]*sync.RWMutex)
s.pathLocks = make(map[string]*sync.RWMutex)
s.lastIn = time.Now()
s.cleanInterval = time.Hour * 12
s.canReceive = true
return s
}

func (s *Stage) getWriteLock(key string) *sync.RWMutex {
s.writeLock.Lock()
defer s.writeLock.Unlock()
func (s *Stage) getPathLock(key string) *sync.RWMutex {
s.pathLock.Lock()
defer s.pathLock.Unlock()
var m *sync.RWMutex
var exists bool
if m, exists = s.writeLocks[key]; !exists {
if m, exists = s.pathLocks[key]; !exists {
m = &sync.RWMutex{}
s.writeLocks[key] = m
s.pathLocks[key] = m
}
return m
}

func (s *Stage) delWriteLock(key string) {
s.writeLock.Lock()
defer s.writeLock.Unlock()
delete(s.writeLocks, key)
func (s *Stage) delPathLock(key string) {
s.pathLock.Lock()
defer s.pathLock.Unlock()
delete(s.pathLocks, key)
s.lastIn = time.Now()
s.logDebug("Write Locks:", len(s.writeLocks))
s.logDebug("Write Locks:", len(s.pathLocks))
}

func (s *Stage) hasWriteLock(key string) bool {
s.writeLock.RLock()
defer s.writeLock.RUnlock()
_, ok := s.writeLocks[key]
func (s *Stage) hasPathLock(key string) bool {
s.pathLock.RLock()
defer s.pathLock.RUnlock()
_, ok := s.pathLocks[key]
return ok
}

// func (s *Stage) getLastIn() time.Time {
// s.writeLock.RLock()
// defer s.writeLock.RUnlock()
// s.pathLock.RLock()
// defer s.pathLock.RUnlock()
// return s.lastIn
// }

Expand All @@ -174,14 +181,16 @@ func (s *Stage) pathToName(path, stripExt string) (name string) {
// Scan walks the stage area tree looking for companion files and returns any
// found in the form of a JSON-encoded byte array
func (s *Stage) Scan(version string) (jsonBytes []byte, err error) {
s.ioLock.RLock()
defer s.ioLock.RUnlock()
var name string
var lock *sync.RWMutex
var partials []*sts.Partial
err = filepath.Walk(s.rootDir,
func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == compExt {
name = s.pathToName(path, compExt)
lock = s.getWriteLock(strings.TrimSuffix(path, compExt))
lock = s.getPathLock(strings.TrimSuffix(path, compExt))
lock.RLock()
// Make sure it still exists.
if _, err := os.Stat(path); err == nil {
Expand All @@ -207,6 +216,8 @@ func (s *Stage) Scan(version string) (jsonBytes []byte, err error) {
}

func (s *Stage) initStageFile(path string, size int64) error {
s.ioLock.RLock()
defer s.ioLock.RUnlock()
var err error
info, err := os.Stat(path + partExt)
if err == nil && info.Size() == size {
Expand Down Expand Up @@ -243,7 +254,7 @@ func (s *Stage) initStageFile(path string, size int64) error {
func (s *Stage) Prepare(parts []sts.Binned) {
for _, part := range parts {
path := filepath.Join(s.rootDir, part.GetName())
lock := s.getWriteLock(path)
lock := s.getPathLock(path)
lock.Lock()
err := s.initStageFile(path, part.GetFileSize())
lock.Unlock()
Expand All @@ -255,6 +266,9 @@ func (s *Stage) Prepare(parts []sts.Binned) {

// Receive reads a single file part with file metadata and reader
func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) {
s.ioLock.RLock()
defer s.ioLock.RUnlock()

if len(file.Parts) != 1 {
err = fmt.Errorf(
"can only receive a single part for a single reader (%d given)",
Expand All @@ -279,7 +293,7 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) {
}

// Make sure we're the only one updating the companion
lock := s.getWriteLock(path)
lock := s.getPathLock(path)
lock.Lock()
defer lock.Unlock()

Expand Down Expand Up @@ -312,7 +326,7 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) {
os.Remove(path + partExt)
if existing.state >= stateFinalized {
os.Remove(path + compExt)
s.delWriteLock(path)
s.delPathLock(path)
}
return
}
Expand Down Expand Up @@ -352,30 +366,36 @@ func (s *Stage) partReceived(part sts.Binned) bool {
s.buildCache(when)
beg, end := part.GetSlice()
path := filepath.Join(s.rootDir, part.GetName())
lock := s.getWriteLock(path)
lock := s.getPathLock(path)
lock.Lock()
defer lock.Unlock()
final := &finalFile{
path: path,
name: part.GetName(),
size: part.GetFileSize(),
hash: part.GetFileHash(),
prev: part.GetPrev(),
path: path,
renamed: part.GetRenamed(),
name: part.GetName(),
size: part.GetFileSize(),
hash: part.GetFileHash(),
prev: part.GetPrev(),
}
existing := s.fromCache(final.path)
if existing == nil {
if cmp, _ := readLocalCompanion(path, final.name); cmp != nil {
if final.renamed != cmp.Renamed || final.hash != cmp.Hash || final.prev != cmp.Prev {
return false
}
if companionPartExists(cmp, beg, end) {
s.logInfo("Part already received:", final.name, beg, end)
return true
}
} else {
s.delWriteLock(path)
s.delPathLock(path)
}
} else if existing.state != stateFailed && existing.hash == final.hash {
} else if existing.state != stateFailed &&
existing.hash == final.hash &&
existing.renamed == final.renamed {
s.logInfo("File already received:", final.name)
if existing.state >= stateFinalized {
s.delWriteLock(path)
s.delPathLock(path)
}
return true
}
Expand Down Expand Up @@ -440,6 +460,8 @@ func (s *Stage) setCanReceive(value bool) {
// files in the stage area that should be completed from the previous server
// run
func (s *Stage) Recover() {
s.ioLock.Lock()
defer s.ioLock.Unlock()
s.setCanReceive(false)
defer s.setCanReceive(true)
s.logInfo("Beginning stage recovery")
Expand Down Expand Up @@ -561,7 +583,9 @@ func (s *Stage) clean(minAge time.Duration) {

s.cleanStrays(minAge)
s.cleanWaiting(minAge)
s.pruneTree(minAge)

s.pruneTree(s.rootDir, minAge)
s.pruneTree(s.targetDir, minAge)

s.cleanLock.Unlock()

Expand All @@ -576,10 +600,12 @@ func (s *Stage) scheduleClean(minAge time.Duration) {
})
}

func (s *Stage) pruneTree(minAge time.Duration) {
func (s *Stage) pruneTree(dir string, minAge time.Duration) {
s.ioLock.Lock()
defer s.ioLock.Unlock()
s.logDebug("Pruning empty directories ...")
var dirs []string
err := filepath.Walk(s.rootDir,
err := filepath.Walk(dir,
func(path string, info os.FileInfo, err error) error {
if err != nil || time.Since(info.ModTime()) < minAge {
return nil
Expand Down Expand Up @@ -766,9 +792,12 @@ func (s *Stage) processHandler() {
}

func (s *Stage) process(file *finalFile) {
s.ioLock.RLock()
defer s.ioLock.RUnlock()

s.logDebug("Validating:", file.name)

fileLock := s.getWriteLock(file.path)
fileLock := s.getPathLock(file.path)
fileLock.Lock()
defer fileLock.Unlock()

Expand Down Expand Up @@ -845,7 +874,7 @@ func (s *Stage) isFileReady(file *finalFile) bool {
prev := s.fromCache(prevPath)
switch {
case prev == nil:
if s.hasWriteLock(prevPath) {
if s.hasPathLock(prevPath) {
s.logDebug("Previous file in progress:", file.name, "<-", file.prev)
break
}
Expand Down Expand Up @@ -904,9 +933,9 @@ func (s *Stage) isFileReady(file *finalFile) bool {
}

func (s *Stage) finalize(file *finalFile) {
fileLock := s.getWriteLock(file.path)
fileLock := s.getPathLock(file.path)
fileLock.Lock()
defer s.delWriteLock(file.path)
defer s.delPathLock(file.path)
defer fileLock.Unlock()

existing := s.fromCache(file.path)
Expand Down Expand Up @@ -943,6 +972,9 @@ func (s *Stage) finalize(file *finalFile) {
}

func (s *Stage) putFileAway(file *finalFile) (targetPath string, err error) {
s.ioLock.RLock()
defer s.ioLock.RUnlock()

// Better to log the file twice rather than receive it twice. If we log
// after putting the file away, it's possible that a crash could occur
// after putting the file away but before logging. On restart, there would
Expand Down

0 comments on commit 983870b

Please sign in to comment.