diff --git a/stage/local.go b/stage/local.go index 3c73c17..b0e0056 100644 --- a/stage/local.go +++ b/stage/local.go @@ -101,10 +101,17 @@ type Stage struct { cache map[string]*finalFile cacheTime time.Time cacheTimes []time.Time - writeLock sync.RWMutex - writeLocks map[string]*sync.RWMutex - readyLock sync.RWMutex - cleanLock sync.RWMutex + + // Every normal file I/O function does a read lock, so write locking + // it will block the stage area from interacting with the file system. + // This is useful for when we want to clean up the stage and target + // areas. + ioLock sync.RWMutex + + pathLock sync.RWMutex + pathLocks map[string]*sync.RWMutex + readyLock sync.RWMutex + cleanLock sync.RWMutex } // New creates a new instance of Stage where the rootDir is the directory @@ -121,43 +128,43 @@ func New(name, rootDir, targetDir string, logger sts.ReceiveLogger, dispatcher s } s.wait = make(map[string][]*finalFile) s.cache = make(map[string]*finalFile) - s.writeLocks = make(map[string]*sync.RWMutex) + s.pathLocks = make(map[string]*sync.RWMutex) s.lastIn = time.Now() s.cleanInterval = time.Hour * 12 s.canReceive = true return s } -func (s *Stage) getWriteLock(key string) *sync.RWMutex { - s.writeLock.Lock() - defer s.writeLock.Unlock() +func (s *Stage) getPathLock(key string) *sync.RWMutex { + s.pathLock.Lock() + defer s.pathLock.Unlock() var m *sync.RWMutex var exists bool - if m, exists = s.writeLocks[key]; !exists { + if m, exists = s.pathLocks[key]; !exists { m = &sync.RWMutex{} - s.writeLocks[key] = m + s.pathLocks[key] = m } return m } -func (s *Stage) delWriteLock(key string) { - s.writeLock.Lock() - defer s.writeLock.Unlock() - delete(s.writeLocks, key) +func (s *Stage) delPathLock(key string) { + s.pathLock.Lock() + defer s.pathLock.Unlock() + delete(s.pathLocks, key) s.lastIn = time.Now() - s.logDebug("Write Locks:", len(s.writeLocks)) + s.logDebug("Write Locks:", len(s.pathLocks)) } -func (s *Stage) hasWriteLock(key string) bool { - s.writeLock.RLock() - defer s.writeLock.RUnlock() - _, ok := s.writeLocks[key] +func (s *Stage) hasPathLock(key string) bool { + s.pathLock.RLock() + defer s.pathLock.RUnlock() + _, ok := s.pathLocks[key] return ok } // func (s *Stage) getLastIn() time.Time { -// s.writeLock.RLock() -// defer s.writeLock.RUnlock() +// s.pathLock.RLock() +// defer s.pathLock.RUnlock() // return s.lastIn // } @@ -174,6 +181,8 @@ func (s *Stage) pathToName(path, stripExt string) (name string) { // Scan walks the stage area tree looking for companion files and returns any // found in the form of a JSON-encoded byte array func (s *Stage) Scan(version string) (jsonBytes []byte, err error) { + s.ioLock.RLock() + defer s.ioLock.RUnlock() var name string var lock *sync.RWMutex var partials []*sts.Partial @@ -181,7 +190,7 @@ func (s *Stage) Scan(version string) (jsonBytes []byte, err error) { func(path string, info os.FileInfo, err error) error { if filepath.Ext(path) == compExt { name = s.pathToName(path, compExt) - lock = s.getWriteLock(strings.TrimSuffix(path, compExt)) + lock = s.getPathLock(strings.TrimSuffix(path, compExt)) lock.RLock() // Make sure it still exists. if _, err := os.Stat(path); err == nil { @@ -207,6 +216,8 @@ func (s *Stage) Scan(version string) (jsonBytes []byte, err error) { } func (s *Stage) initStageFile(path string, size int64) error { + s.ioLock.RLock() + defer s.ioLock.RUnlock() var err error info, err := os.Stat(path + partExt) if err == nil && info.Size() == size { @@ -243,7 +254,7 @@ func (s *Stage) initStageFile(path string, size int64) error { func (s *Stage) Prepare(parts []sts.Binned) { for _, part := range parts { path := filepath.Join(s.rootDir, part.GetName()) - lock := s.getWriteLock(path) + lock := s.getPathLock(path) lock.Lock() err := s.initStageFile(path, part.GetFileSize()) lock.Unlock() @@ -255,6 +266,9 @@ func (s *Stage) Prepare(parts []sts.Binned) { // Receive reads a single file part with file metadata and reader func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { + s.ioLock.RLock() + defer s.ioLock.RUnlock() + if len(file.Parts) != 1 { err = fmt.Errorf( "can only receive a single part for a single reader (%d given)", @@ -279,7 +293,7 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { } // Make sure we're the only one updating the companion - lock := s.getWriteLock(path) + lock := s.getPathLock(path) lock.Lock() defer lock.Unlock() @@ -312,7 +326,7 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { os.Remove(path + partExt) if existing.state >= stateFinalized { os.Remove(path + compExt) - s.delWriteLock(path) + s.delPathLock(path) } return } @@ -352,30 +366,36 @@ func (s *Stage) partReceived(part sts.Binned) bool { s.buildCache(when) beg, end := part.GetSlice() path := filepath.Join(s.rootDir, part.GetName()) - lock := s.getWriteLock(path) + lock := s.getPathLock(path) lock.Lock() defer lock.Unlock() final := &finalFile{ - path: path, - name: part.GetName(), - size: part.GetFileSize(), - hash: part.GetFileHash(), - prev: part.GetPrev(), + path: path, + renamed: part.GetRenamed(), + name: part.GetName(), + size: part.GetFileSize(), + hash: part.GetFileHash(), + prev: part.GetPrev(), } existing := s.fromCache(final.path) if existing == nil { if cmp, _ := readLocalCompanion(path, final.name); cmp != nil { + if final.renamed != cmp.Renamed || final.hash != cmp.Hash || final.prev != cmp.Prev { + return false + } if companionPartExists(cmp, beg, end) { s.logInfo("Part already received:", final.name, beg, end) return true } } else { - s.delWriteLock(path) + s.delPathLock(path) } - } else if existing.state != stateFailed && existing.hash == final.hash { + } else if existing.state != stateFailed && + existing.hash == final.hash && + existing.renamed == final.renamed { s.logInfo("File already received:", final.name) if existing.state >= stateFinalized { - s.delWriteLock(path) + s.delPathLock(path) } return true } @@ -440,6 +460,8 @@ func (s *Stage) setCanReceive(value bool) { // files in the stage area that should be completed from the previous server // run func (s *Stage) Recover() { + s.ioLock.Lock() + defer s.ioLock.Unlock() s.setCanReceive(false) defer s.setCanReceive(true) s.logInfo("Beginning stage recovery") @@ -561,7 +583,9 @@ func (s *Stage) clean(minAge time.Duration) { s.cleanStrays(minAge) s.cleanWaiting(minAge) - s.pruneTree(minAge) + + s.pruneTree(s.rootDir, minAge) + s.pruneTree(s.targetDir, minAge) s.cleanLock.Unlock() @@ -576,10 +600,12 @@ func (s *Stage) scheduleClean(minAge time.Duration) { }) } -func (s *Stage) pruneTree(minAge time.Duration) { +func (s *Stage) pruneTree(dir string, minAge time.Duration) { + s.ioLock.Lock() + defer s.ioLock.Unlock() s.logDebug("Pruning empty directories ...") var dirs []string - err := filepath.Walk(s.rootDir, + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil || time.Since(info.ModTime()) < minAge { return nil @@ -766,9 +792,12 @@ func (s *Stage) processHandler() { } func (s *Stage) process(file *finalFile) { + s.ioLock.RLock() + defer s.ioLock.RUnlock() + s.logDebug("Validating:", file.name) - fileLock := s.getWriteLock(file.path) + fileLock := s.getPathLock(file.path) fileLock.Lock() defer fileLock.Unlock() @@ -845,7 +874,7 @@ func (s *Stage) isFileReady(file *finalFile) bool { prev := s.fromCache(prevPath) switch { case prev == nil: - if s.hasWriteLock(prevPath) { + if s.hasPathLock(prevPath) { s.logDebug("Previous file in progress:", file.name, "<-", file.prev) break } @@ -904,9 +933,9 @@ func (s *Stage) isFileReady(file *finalFile) bool { } func (s *Stage) finalize(file *finalFile) { - fileLock := s.getWriteLock(file.path) + fileLock := s.getPathLock(file.path) fileLock.Lock() - defer s.delWriteLock(file.path) + defer s.delPathLock(file.path) defer fileLock.Unlock() existing := s.fromCache(file.path) @@ -943,6 +972,9 @@ func (s *Stage) finalize(file *finalFile) { } func (s *Stage) putFileAway(file *finalFile) (targetPath string, err error) { + s.ioLock.RLock() + defer s.ioLock.RUnlock() + // Better to log the file twice rather than receive it twice. If we log // after putting the file away, it's possible that a crash could occur // after putting the file away but before logging. On restart, there would