diff --git a/fsspec/implementations/cached.py b/fsspec/implementations/cached.py index e9229b712..013878c07 100644 --- a/fsspec/implementations/cached.py +++ b/fsspec/implementations/cached.py @@ -7,6 +7,7 @@ import tempfile import time from shutil import rmtree +from threading import RLock from fsspec import AbstractFileSystem, filesystem from fsspec.callbacks import _DEFAULT_CALLBACK @@ -120,6 +121,7 @@ def __init__( if isinstance(target_protocol, str) else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0]) ) + self.lock = RLock() self.load_cache() self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs) @@ -134,22 +136,23 @@ def _mkcache(self): def load_cache(self): """Read set of stored blocks from file""" - cached_files = [] - for storage in self.storage: - fn = os.path.join(storage, "cache") - if os.path.exists(fn): - with open(fn, "rb") as f: - # TODO: consolidate blocks here - loaded_cached_files = pickle.load(f) - for c in loaded_cached_files.values(): - if isinstance(c["blocks"], list): - c["blocks"] = set(c["blocks"]) - cached_files.append(loaded_cached_files) - else: - cached_files.append({}) - self._mkcache() - self.cached_files = cached_files or [{}] - self.last_cache = time.time() + with self.lock: + cached_files = [] + for storage in self.storage: + fn = os.path.join(storage, "cache") + if os.path.exists(fn): + with open(fn, "rb") as f: + # TODO: consolidate blocks here + loaded_cached_files = pickle.load(f) + for c in loaded_cached_files.values(): + if isinstance(c["blocks"], list): + c["blocks"] = set(c["blocks"]) + cached_files.append(loaded_cached_files) + else: + cached_files.append({}) + self._mkcache() + self.cached_files = cached_files or [{}] + self.last_cache = time.time() def save_cache(self): """Save set of stored blocks from file""" @@ -157,39 +160,40 @@ def save_cache(self): # TODO: a file lock could be used to ensure file does not change # between re-read and write; but occasional duplicated reads ok. cache = self.cached_files[-1] - if os.path.exists(fn): - with open(fn, "rb") as f: - cached_files = pickle.load(f) - for k, c in cached_files.items(): - if k in cache: - if c["blocks"] is True or cache[k]["blocks"] is True: - c["blocks"] = True - else: - # self.cached_files[*][*]["blocks"] must continue to - # point to the same set object so that updates - # performed by MMapCache are propagated back to - # self.cached_files. - blocks = cache[k]["blocks"] - blocks.update(c["blocks"]) - c["blocks"] = blocks - c["time"] = max(c["time"], cache[k]["time"]) - c["uid"] = cache[k]["uid"] - - # Files can be added to cache after it was written once - for k, c in cache.items(): - if k not in cached_files: - cached_files[k] = c - else: - cached_files = cache - cache = {k: v.copy() for k, v in cached_files.items()} - for c in cache.values(): - if isinstance(c["blocks"], set): - c["blocks"] = list(c["blocks"]) - self._mkcache() - with atomic_write(fn) as f: - pickle.dump(cache, f) - self.cached_files[-1] = cached_files - self.last_cache = time.time() + with self.lock: + if os.path.exists(fn): + with open(fn, "rb") as f: + cached_files = pickle.load(f) + for k, c in cached_files.items(): + if k in cache: + if c["blocks"] is True or cache[k]["blocks"] is True: + c["blocks"] = True + else: + # self.cached_files[*][*]["blocks"] must continue to + # point to the same set object so that updates + # performed by MMapCache are propagated back to + # self.cached_files. + blocks = cache[k]["blocks"] + blocks.update(c["blocks"]) + c["blocks"] = blocks + c["time"] = max(c["time"], cache[k]["time"]) + c["uid"] = cache[k]["uid"] + + # Files can be added to cache after it was written once + for k, c in cache.items(): + if k not in cached_files: + cached_files[k] = c + else: + cached_files = cache + cache = {k: v.copy() for k, v in cached_files.items()} + for c in cache.values(): + if isinstance(c["blocks"], set): + c["blocks"] = list(c["blocks"]) + self._mkcache() + with atomic_write(fn) as f: + pickle.dump(cache, f) + self.cached_files[-1] = cached_files + self.last_cache = time.time() def _check_cache(self): """Reload caches if time elapsed or any disappeared""" @@ -228,8 +232,9 @@ def clear_cache(self): In the case of multiple cache locations, this clears only the last one, which is assumed to be the read/write one. """ - rmtree(self.storage[-1]) - self.load_cache() + with self.lock: + rmtree(self.storage[-1]) + self.load_cache() def clear_expired_cache(self, expiry_time=None): """Remove all expired files and metadata from the cache @@ -248,26 +253,27 @@ def clear_expired_cache(self, expiry_time=None): if not expiry_time: expiry_time = self.expiry - self._check_cache() - - for path, detail in self.cached_files[-1].copy().items(): - if time.time() - detail["time"] > expiry_time: - if self.same_names: - basename = os.path.basename(detail["original"]) - fn = os.path.join(self.storage[-1], basename) - else: - fn = os.path.join(self.storage[-1], detail["fn"]) - if os.path.exists(fn): - os.remove(fn) - self.cached_files[-1].pop(path) + with self.lock: + self._check_cache() - if self.cached_files[-1]: - cache_path = os.path.join(self.storage[-1], "cache") - with atomic_write(cache_path) as fc: - pickle.dump(self.cached_files[-1], fc) - else: - rmtree(self.storage[-1]) - self.load_cache() + for path, detail in self.cached_files[-1].copy().items(): + if time.time() - detail["time"] > expiry_time: + if self.same_names: + basename = os.path.basename(detail["original"]) + fn = os.path.join(self.storage[-1], basename) + else: + fn = os.path.join(self.storage[-1], detail["fn"]) + if os.path.exists(fn): + os.remove(fn) + self.cached_files[-1].pop(path) + + if self.cached_files[-1]: + cache_path = os.path.join(self.storage[-1], "cache") + with atomic_write(cache_path) as fc: + pickle.dump(self.cached_files[-1], fc) + else: + rmtree(self.storage[-1]) + self.load_cache() def pop_from_cache(self, path): """Remove cached version of given file @@ -283,9 +289,10 @@ def pop_from_cache(self, path): _, fn = details if fn.startswith(self.storage[-1]): # is in in writable cache - os.remove(fn) - self.cached_files[-1].pop(path) - self.save_cache() + with self.lock: + os.remove(fn) + self.cached_files[-1].pop(path) + self.save_cache() else: raise PermissionError( "Can only delete cached file in last, writable cache location"