Skip to content

Commit

Permalink
upgrade to dvc-objects 0.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Jun 11, 2022
1 parent 09b484f commit 531dbb5
Show file tree
Hide file tree
Showing 15 changed files with 470 additions and 247 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ install_requires=
dictdiffer>=0.8.1
pygtrie>=2.3.2
shortuuid>=0.5.0
dvc-objects==0.0.3
dvc-objects==0.0.4

[options.extras_require]
tests =
Expand Down
8 changes: 4 additions & 4 deletions src/dvc_data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

def check(odb: "ObjectDB", obj: "HashFile", **kwargs):
if isinstance(obj, Tree):
for _, _, oid in obj:
odb.check(oid, **kwargs)
for _, _, hash_info in obj:
odb.check(hash_info.value, **kwargs)

odb.check(obj.hash_info, **kwargs)
odb.check(obj.oid, **kwargs)


def load(odb: "ObjectDB", hash_info: "HashInfo") -> "HashFile":
if hash_info.isdir:
return Tree.load(odb, hash_info)
return odb.get(hash_info)
return odb.get(hash_info.value)


def iterobjs(
Expand Down
4 changes: 2 additions & 2 deletions src/dvc_data/checkout.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _checkout_file(
"""The file is changed we need to checkout a new copy"""
modified = False

cache_fs_path = cache.hash_to_path(change.new.oid.value)
cache_fs_path = cache.oid_to_path(change.new.oid.value)
if change.old.oid:
if relink:
if fs.iscopy(fs_path) and cache.cache_types[0] == "copy":
Expand Down Expand Up @@ -187,7 +187,7 @@ def _checkout(
if not diff:
return

links = test_links(cache.cache_types, cache.fs, cache.fs_path, fs, fs_path)
links = test_links(cache.cache_types, cache.fs, cache.path, fs, fs_path)
if not links:
raise LinkError(fs_path)
link = Link(links)
Expand Down
10 changes: 5 additions & 5 deletions src/dvc_data/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

def get_odb(fs, fs_path, **config):
from dvc_objects.fs import Schemes
from dvc_objects.hashfile.db import ObjectDB
from dvc_objects.hashfile.db import HashFileDB

from .local import LocalObjectDB
from .local import LocalHashFileDB

if fs.protocol == Schemes.LOCAL:
return LocalObjectDB(fs, fs_path, **config)
return LocalHashFileDB(fs, fs_path, **config)

return ObjectDB(fs, fs_path, **config)
return HashFileDB(fs, fs_path, **config)


def get_index(odb) -> "ObjectDBIndexBase":
Expand All @@ -25,6 +25,6 @@ def get_index(odb) -> "ObjectDBIndexBase":
return cls(
odb.tmp_dir,
hashlib.sha256(
odb.fs.unstrip_protocol(odb.fs_path).encode("utf-8")
odb.fs.unstrip_protocol(odb.path).encode("utf-8")
).hexdigest(),
)
89 changes: 43 additions & 46 deletions src/dvc_data/db/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,25 @@
import stat
from functools import partial

from dvc_objects.db import noop, wrap_iter
from dvc_objects.errors import ObjectDBError, ObjectFormatError
from dvc_objects.fs.system import umask
from dvc_objects.fs.utils import copyfile, relpath, remove
from dvc_objects.hashfile.db import ObjectDB, noop, wrap_iter
from dvc_objects.hashfile.hash_info import HashInfo
from dvc_objects.hashfile.db import HashFileDB
from funcy import cached_property
from shortuuid import uuid

logger = logging.getLogger(__name__)


class LocalObjectDB(ObjectDB):
class LocalHashFileDB(HashFileDB):
DEFAULT_CACHE_TYPES = ["reflink", "copy"]
CACHE_MODE = 0o444
UNPACKED_DIR_SUFFIX = ".unpacked"

def __init__(self, fs, fs_path, **config):
super().__init__(fs, fs_path, **config)
self.cache_dir = fs_path
def __init__(self, fs, path, **config):
super().__init__(fs, path, **config)
self.cache_dir = path

shared = config.get("shared")
if shared:
Expand All @@ -33,11 +33,11 @@ def __init__(self, fs, fs_path, **config):

@property
def cache_dir(self):
return self.fs_path if self.fs_path else None
return self.path if self.path else None

@cache_dir.setter
def cache_dir(self, value):
self.fs_path = value
self.path = value

@cached_property
def cache_path(self):
Expand All @@ -47,50 +47,49 @@ def move(self, from_info, to_info):
super().move(from_info, to_info)
os.chmod(to_info, self._file_mode)

def makedirs(self, fs_path):
def makedirs(self, path):
from dvc_objects.fs.utils import makedirs

makedirs(fs_path, exist_ok=True, mode=self._dir_mode)
makedirs(path, exist_ok=True, mode=self._dir_mode)

def hash_to_path(self, hash_):
def oid_to_path(self, oid):
# NOTE: `self.cache_path` is already normalized so we can simply use
# `os.sep` instead of `os.path.join`. This results in this helper
# being ~5.5 times faster.
return f"{self.cache_path}{os.sep}{hash_[0:2]}{os.sep}{hash_[2:]}"
return f"{self.cache_path}{os.sep}{oid[0:2]}{os.sep}{oid[2:]}"

def hashes_exist(
self, hashes, jobs=None, progress=noop
def oids_exist(
self, oids, jobs=None, progress=noop
): # pylint: disable=unused-argument
ret = []
progress = partial(progress, "querying", len(hashes))
progress = partial(progress, "querying", len(oids))

for hash_ in wrap_iter(hashes, progress):
hash_info = HashInfo(self.fs.PARAM_CHECKSUM, hash_)
for oid in wrap_iter(oids, progress):
try:
self.check(hash_info)
ret.append(hash_)
self.check(oid)
ret.append(oid)
except (FileNotFoundError, ObjectFormatError):
pass

return ret

def _list_paths(self, prefix=None):
assert self.fs_path is not None
assert self.path is not None
if prefix:
fs_path = self.fs.path.join(self.fs_path, prefix[:2])
if not self.fs.exists(fs_path):
path = self.fs.path.join(self.path, prefix[:2])
if not self.fs.exists(path):
return
else:
fs_path = self.fs_path
yield from self.fs.find(fs_path)
path = self.path
yield from self.fs.find(path)

def _remove_unpacked_dir(self, hash_):
hash_fs_path = self.hash_to_path(hash_)
fs_path = self.fs.path.with_name(
hash_fs_path,
self.fs.path.name(hash_fs_path) + self.UNPACKED_DIR_SUFFIX,
hash_path = self.oid_to_path(hash_)
path = self.fs.path.with_name(
hash_path,
self.fs.path.name(hash_path) + self.UNPACKED_DIR_SUFFIX,
)
self.fs.remove(fs_path)
self.fs.remove(path)

def _unprotect_file(self, path):
if self.fs.is_symlink(path) or self.fs.is_hardlink(path):
Expand Down Expand Up @@ -119,42 +118,40 @@ def _unprotect_dir(self, path):
for fname in self.fs.find(path):
self._unprotect_file(fname)

def unprotect(self, fs_path):
if not os.path.exists(fs_path):
raise ObjectDBError(
f"can't unprotect non-existing data '{fs_path}'"
)
def unprotect(self, path):
if not os.path.exists(path):
raise ObjectDBError(f"can't unprotect non-existing data '{path}'")

if os.path.isdir(fs_path):
self._unprotect_dir(fs_path)
if os.path.isdir(path):
self._unprotect_dir(path)
else:
self._unprotect_file(fs_path)
self._unprotect_file(path)

def protect(self, fs_path):
def protect(self, path):
try:
os.chmod(fs_path, self.CACHE_MODE)
os.chmod(path, self.CACHE_MODE)
except OSError:
# NOTE: not being able to protect cache file is not fatal, it
# might happen on funky filesystems (e.g. Samba, see #5255),
# read-only filesystems or in a shared cache scenario.
logger.debug("failed to protect '%s'", fs_path, exc_info=True)
logger.debug("failed to protect '%s'", path, exc_info=True)

def is_protected(self, fs_path):
def is_protected(self, path):
try:
mode = os.stat(fs_path).st_mode
mode = os.stat(path).st_mode
except FileNotFoundError:
return False

return stat.S_IMODE(mode) == self.CACHE_MODE

def set_exec(self, fs_path):
mode = os.stat(fs_path).st_mode | stat.S_IEXEC
def set_exec(self, path):
mode = os.stat(path).st_mode | stat.S_IEXEC
try:
os.chmod(fs_path, mode)
os.chmod(path, mode)
except OSError:
logger.debug(
"failed to chmod '%s' '%s'",
oct(mode),
fs_path,
path,
exc_info=True,
)
74 changes: 44 additions & 30 deletions src/dvc_data/db/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,82 +2,96 @@
from contextlib import suppress
from typing import TYPE_CHECKING, Dict

from dvc_objects.db import ObjectDB
from dvc_objects.errors import ObjectFormatError
from dvc_objects.hashfile.db import ObjectDB
from dvc_objects.hashfile.db import HashFileDB, HashInfo
from dvc_objects.hashfile.hash import hash_file
from dvc_objects.hashfile.obj import HashFile

from ..objects.reference import ReferenceHashFile
from ..objects.reference import ReferenceObject

if TYPE_CHECKING:
from dvc_objects.fs.base import AnyFSPath, FileSystem
from dvc_objects.hashfile.hash_info import HashInfo

logger = logging.getLogger(__name__)


class ReferenceObjectDB(ObjectDB):
class ReferenceHashFileDB(HashFileDB):
"""Reference ODB.
File objects are stored as ReferenceHashFiles which reference paths outside
File objects are stored as ReferenceObjects which reference paths outside
of the staging ODB fs. Tree objects are stored natively.
"""

def __init__(self, fs: "FileSystem", path: str, **config):
super().__init__(fs, path, **config)
self.raw = ObjectDB(self.fs, self.fs_path, **self.config)
self.raw = ObjectDB(self.fs, self.path, **self.config)
self._fs_cache: Dict[tuple, "FileSystem"] = {}
self._obj_cache: Dict["HashInfo", "ReferenceHashFile"] = {}
self._obj_cache: Dict["HashInfo", "ReferenceObject"] = {}

def get(self, hash_info: "HashInfo"):
raw = self.raw.get(hash_info)
def _deref(self, obj):
return HashFile(obj.ref.path, obj.ref.fs, obj.ref.hash_info)

def get(self, oid: str):
raw = self.raw.get(oid)

hash_info = HashInfo(self.hash_name, oid)

if hash_info.isdir:
return raw
return HashFile(raw.path, raw.fs, hash_info)

try:
return self._obj_cache[hash_info]
except KeyError:
pass

try:
obj = ReferenceHashFile.from_raw(raw, fs_cache=self._fs_cache)
obj = ReferenceObject.from_raw(raw, fs_cache=self._fs_cache)
except ObjectFormatError:
raw.fs.remove(raw.fs_path)
raw.fs.remove(raw.path)
raise

deref = obj.deref()
deref = self._deref(obj)
self._obj_cache[hash_info] = deref

return deref

def add(
self,
fs_path: "AnyFSPath",
path: "AnyFSPath",
fs: "FileSystem",
hash_info: "HashInfo",
oid: str,
**kwargs,
): # pylint: disable=arguments-differ
hash_info = HashInfo(self.hash_name, oid)
if hash_info.isdir:
return self.raw.add(fs_path, fs, hash_info, **kwargs)
return self.raw.add(path, fs, oid, **kwargs)

obj = ReferenceHashFile.from_path(
fs_path, fs, hash_info, fs_cache=self._fs_cache
obj = ReferenceObject.from_path(
path, fs, hash_info, fs_cache=self._fs_cache
)
self._obj_cache[hash_info] = obj.deref()
self._obj_cache[hash_info] = self._deref(obj)

return self.raw.add(obj.fs_path, obj.fs, hash_info, **kwargs)
return self.raw.add(obj.path, obj.fs, oid, **kwargs)

def check(
self,
hash_info: "HashInfo",
oid: str,
check_hash: bool = True,
):
obj = self.get(hash_info)

try:
obj.check(self, check_hash=check_hash)
except ObjectFormatError:
raw = self.raw.get(hash_info)
logger.debug("corrupted cache file '%s'.", raw.fs_path)
if not check_hash:
if not self.exists(oid):
raise FileNotFoundError
return

obj = self.get(oid)

_, actual = hash_file(obj.path, obj.fs, obj.hash_info.name, self.state)
assert actual.name == self.hash_name
assert actual.value
if actual.value.split(".")[0] != oid.split(".")[0]:
raw = self.raw.get(oid)
logger.debug("corrupted cache file '%s'.", raw.path)
with suppress(FileNotFoundError):
raw.fs.remove(raw.fs_path)
raise
raw.fs.remove(raw.path)
raise ObjectFormatError(f"{obj} is corrupted")
2 changes: 1 addition & 1 deletion src/dvc_data/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def _in_cache(oid, cache):
return False

try:
cache.check(oid)
cache.check(oid.value)
return True
except (FileNotFoundError, ObjectFormatError):
return False
Expand Down
4 changes: 2 additions & 2 deletions src/dvc_data/gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def _is_dir_hash(_hash):
removed = False
# hashes must be sorted to ensure we always remove .dir files first

hashes = QueryingProgress(odb.all(jobs), name=odb.fs_path)
hashes = QueryingProgress(odb.all(jobs), name=odb.path)
for hash_ in sorted(hashes, key=_is_dir_hash, reverse=True):
if hash_ in used_hashes:
continue
fs_path = odb.hash_to_path(hash_)
fs_path = odb.oid_to_path(hash_)
if _is_dir_hash(hash_):
# backward compatibility
# pylint: disable=protected-access
Expand Down
Loading

0 comments on commit 531dbb5

Please sign in to comment.