Skip to content

Commit

Permalink
Merge pull request #1 from livMatS/2024-08-29-special-remote
Browse files Browse the repository at this point in the history
WIP: start to implement dtool secial remote
  • Loading branch information
jotelha authored Nov 7, 2024
2 parents 3375af4 + dd8813c commit bf8d80b
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 0 deletions.
216 changes: 216 additions & 0 deletions datalad_dtool/dtool_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import logging
import shutil

from annexremote import Master
from annexremote import ExportRemote
from annexremote import RemoteError

from dtoolcore import DataSet, ProtoDataSet, DtoolCoreTypeError

logging.basicConfig(level=logging.DEBUG)

logger = logging.getLogger(__name__)


def extract_backend(key):
# Split the key by "--"
parts = key.split("-")

if len(parts) < 2:
return None # Invalid key format

# Get the last part (hash + possible extension)
return parts[0]


return hash_only


def extract_hash(key):
# Split the key by "--"
parts = key.split("--")

if len(parts) < 2:
return None # Invalid key format

# Get the last part (hash + possible extension)
hash_part = parts[-1]

# Remove file extension if present
hash_only = hash_part.split('.')[0]

return hash_only


class DtoolRemote(ExportRemote):
"""A read-only special remote for retrieving files from dtool datasets."""
transfer_store = None
remove = None

def __init__(self, annex):
super().__init__(annex)
self.configs = {
'uri': "dtool dataset URI"
}

def initremote(self) -> None:
# initialize the remote, e.g. create the folders
# raise RemoteError if the remote couldn't be initialized
self.uri = self.annex.getconfig("uri")
if not self.uri:
raise RemoteError("You need to set uri=")
logger.debug("Set dtool dataset uri=%s", self.uri)

def prepare(self) -> None:
# prepare to be used, eg. open TCP connection, authenticate with the server etc.
# raise RemoteError if not ready to use
self.uri = self.annex.getconfig("uri")
try:
self.dtool_dataset = DataSet.from_uri(self.uri)
logger.debug("Dataset uri=%s frozen, immutable.", self.uri)
except DtoolCoreTypeError as exc:
logger.warning(exc)
self.dtool_dataset = ProtoDataSet.from_uri(self.uri)
pass

def transfer_retrieve(self, key, filename):
# get the file identified by `key` and store it to `filename`
# raise RemoteError if the file couldn't be retrieved
exceptions = []

backend = self.annex.getconfig('keybackend_' + key)
logger.debug("Key %s uses backend %s", key, backend)

file_hash = extract_backend(key)

logger.debug("Try to locate file of chekcsum/hash %s in dataset %s", file_hash, self.uri)
manifest = self.dtool_dataset.generate_manifest()
if backend.startswith('MD5') and (manifest["hash_function"] == "md5sum_hexdigest"):
for uuid, entry in manifest['items'].items():
if entry["hash"] == file_hash:
try:
fpath = self.dtool_dataset.item_content_abspath(uuid)
shutil.copyfile(fpath, filename)
return
except Exception as e:
exceptions.append(e)

urls = self.annex.geturls(key, f"dtool:{self.uri}")
logger.debug("Retrieve from %s", urls)

for url in urls:
url = url[len('dtool:'):]
try:
dataset_uri, item_uuid = url.rsplit('/', 1)

assert dataset_uri == self.uri

logger.debug("Try to retrieve item %s from dataset %s", item_uuid, dataset_uri)
# dtool_dataset = DataSet.from_uri(dataset_uri)
fpath = self.dtool_dataset.item_content_abspath(item_uuid)
logger.debug("Cached item content at %s", fpath)
shutil.copyfile(fpath, filename)
return
except Exception as e:
exceptions.append(e)

raise RemoteError(exceptions)

def checkpresent(self, key):
# return True if the key is present in the remote
# return False if the key is not present
# raise RemoteError if the presence of the key couldn't be determined, eg. in case of connection error

# first, try to identify file from actual md5 key

exceptions = []

backend = extract_backend(key)
logger.debug("Key %s uses backend %s", key, backend)

try:
file_hash = extract_hash(key)
logger.debug("Try to locate hash/checksum %s in dataset %s", file_hash, self.uri)

manifest = self.dtool_dataset.generate_manifest()
if backend.startswith('MD5') and (manifest["hash_function"] == "md5sum_hexdigest"):
for uuid, entry in manifest['items'].items():
if entry["hash"] == file_hash:
logger.debug("Located item %s in dataset %s", uuid, self.uri)
return True
except Exception as e:
exceptions.append(e)

# next, try to identify file from dtool URLs

urls = self.annex.geturls(key, f"dtool:{self.uri}")

for url in urls:
url = url[len('dtool:'):]
try:
dataset_uri, item_uuid = url.rsplit('/', 1)

assert dataset_uri == self.uri

logger.debug("Try to locate item %s in dataset %s", item_uuid, dataset_uri)

# dtool_dataset = DataSet.from_uri(dataset_uri)
manifest = self.dtool_dataset.generate_manifest()
if item_uuid in manifest['items']:
logger.debug("Located item %s in dataset %s", item_uuid, dataset_uri)
return True

except Exception as e:
exceptions.append(e)

if len(exceptions) > 0:
raise exceptions[-1]

return False

def claimurl(self, url: str) -> bool:
logger.debug("Check claim to URL %s", url)
return url.startswith(f"dtool:{self.uri}/")

def checkurl(self, url: str) -> bool:
return url.startswith(f"dtool:{self.uri}/")
# TODO: implement more sophisticated checking on URL

def getcost(self) -> int:
# This is a very expensive remote
return 1000

def getavailability(self) -> str:
return "global"

## Export methods
def transferexport_store(self, key, local_file, remote_file):
pass

def transferexport_retrieve(self, key, local_file, remote_file):
manifest = self.dtool_dataset.generate_manifest()
for uuid, entry in manifest['items'].items():
if entry["relpath"] == remote_file:
try:
fpath = self.dtool_dataset.item_content_abspath(uuid)
shutil.copyfile(fpath, local_file)
except Exception as e:
raise RemoteError(e)

pass

def checkpresentexport(self, key, remote_file):
pass

def removeexport(self, key, remote_file):
pass

def removeexportdirectory(self, remote_directory):
pass


def main() -> None:
master = Master()
remote = DtoolRemote(master)
master.LinkRemote(remote)
master.Listen()
130 changes: 130 additions & 0 deletions datalad_dtool/import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""DataLad extension for the Climate Data Store"""

__docformat__ = "restructuredtext"
import logging
from typing import Iterable, Literal, Optional, Union

from datalad.distribution.dataset import (
EnsureDataset,
datasetmethod,
require_dataset,
)
from datalad.interface.base import Interface, build_doc, eval_results
from datalad.interface.common_opts import nosave_opt, save_message_opt
from datalad.interface.results import get_status_dict
from datalad.support.annexrepo import AnnexRepo
from datalad.support.constraints import EnsureNone, EnsureStr
from datalad.support.param import Parameter

import datalad_dtool.dtool_remote
# import datalad_cds.spec

logger = logging.getLogger("datalad.cds.download_cds")


# decoration auto-generates standard help
@build_doc
# all commands must be derived from Interface
class DownloadCDS(Interface):
"""Downloads specified datasets from the CDS data store"""

_params_ = dict(
spec=Parameter(
doc="""A json string or python dictionary containing the key
"dataset" with the datasets name (i.e. what is shown as the first
parameter to cdsapi.Client.retrieve if you do a "Show API request"
on some dataset in the CDS) and the key "sub-selection" with the
sub-selection of the dataset that should be fetched (i.e. what is
shown as the second parameter to cdsapi.Client.retrieve).""",
),
dataset=Parameter(
args=("-d", "--dataset"),
metavar="PATH",
doc="""specify the dataset to add files to. If no dataset is given,
an attempt is made to identify the dataset based on the current
working directory. Use [CMD: --nosave CMD][PY: save=False PY] to
prevent adding files to the dataset.""",
constraints=EnsureDataset() | EnsureNone(),
),
path=Parameter(
args=("-O", "--path"),
doc="""target path to download to.""",
constraints=EnsureStr(),
),
lazy=Parameter(
args=("--lazy",),
action="store_true",
doc="""By default the file will be immediately downloaded. If the
lazy flag is supplied then the dtool dataset and item is only recorded as a
source for the file, but no download is initiated. Keep in mind that
there is no way to validate the correctness of the request if the
lazy flag is used.""",
),
save=nosave_opt,
message=save_message_opt,
)

@staticmethod
@datasetmethod(name="import_dtool")
@eval_results
def __call__(
spec: Union[str, dict],
path: str,
*,
dataset: Optional[str] = None,
message: Optional[str] = None,
save: bool = True,
lazy: bool = False,
) -> Iterable[dict]:
if isinstance(spec, dict):
parsed_spec = datalad_cds.spec.Spec.from_dict(spec)
elif isinstance(spec, str):
parsed_spec = datalad_cds.spec.Spec.from_json(spec)
else:
raise TypeError("spec could not be parsed")
ds = require_dataset(dataset, check_installed=True)
ensure_special_remote_exists_and_is_enabled(ds.repo, "cds")
pathobj = ds.pathobj / path
url = parsed_spec.to_url()
options = []
if lazy:
options.append("--relaxed")
ds.repo.add_url_to_file(pathobj, url, options=options)
if save:
msg = (
message
if message is not None
else "[DATALAD] Download from Climate Data Store"
)
yield ds.save(pathobj, message=msg)
yield get_status_dict(action="cds", ds=ds, status="ok")


def ensure_special_remote_exists_and_is_enabled(
repo: AnnexRepo, remote: Literal["dtool"]
) -> None:
"""Initialize and enable the dtool special remote, if it isn't already.
Very similar to datalad.customremotes.base.ensure_datalad_remote.
"""

uuids = {"cds": datalad_dtool.dtool_remote.DTOOL_REMOTE_UUID}
uuid = uuids[remote]

name = repo.get_special_remotes().get(uuid, {}).get("name")
if not name:
repo.init_remote(
remote,
[
"encryption=none",
"type=external",
"autoenable=true",
"externaltype={}".format(remote),
"uuid={}".format(uuid),
],
)
elif repo.is_special_annex_remote(name, check_if_known=False):
logger.debug("special remote %s is enabled", name)
else:
logger.debug("special remote %s found, enabling", name)
repo.enable_remote(name)
44 changes: 44 additions & 0 deletions examples/test_git-annex-remote-dtool
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
# Test script to drive the example external remote

set -eu -o pipefail -x

cd $(dirname "$0")

export PATH=$PWD:$PATH

TMP="$(mktemp -d "${TMPDIR:-/tmp}/gar-XXXXXXX")"
# so there is no global git config side-effects
export HOME="$TMP"

SOURCE_DATASET="file://$TMP/test-dataset"
SOURCE_DATASET_NAME="test-dataset"
REPO_DIR="$TMP/repo"
cd "$TMP"

echo "This is a test file." > testfile.txt

TESTFILE_PATH="$TMP/testfile.txt"

dtool create ${SOURCE_DATASET_NAME}

mkdir -p "$REPO_DIR"

cd "$REPO_DIR"
git init
git config user.email "[email protected]"
git config user.name "Some User"
git annex init

# put item into dataset at git annex-expected key
dtool add item "${TESTFILE_PATH}" "${SOURCE_DATASET}"
dtool freeze "${SOURCE_DATASET}"

ITEM_UUID=$(dtool ls "${SOURCE_DATASET}" | awk '{ print $1 }')

git annex initremote --verbose --debug dtool_remote type=external externaltype=dtool encryption=none exporttree=yes uri="${SOURCE_DATASET}"

git annex addurl --backend=MD5E --file testfile.txt "dtool:${SOURCE_DATASET}/${ITEM_UUID}"

# test read-only special remote
git annex testremote --debug --verbose dtool_remote --test-readonly=testfile.txt 2>&1 | tail -n 1000
Loading

0 comments on commit bf8d80b

Please sign in to comment.