diff --git a/requirements.txt b/requirements.txt index d473ab3..4003b9d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ jinja2 python-dateutil click prov -schema_salad \ No newline at end of file +typing-extensions diff --git a/rocrate/provenance_profile.py b/rocrate/provenance_profile.py index b2effdd..41d945d 100644 --- a/rocrate/provenance_profile.py +++ b/rocrate/provenance_profile.py @@ -1,19 +1,12 @@ -import copy -import pdb import datetime -import logging import urllib import uuid import json -from io import BytesIO from pathlib import PurePath, PurePosixPath -from socket import getfqdn from typing import ( Any, Dict, - Iterable, List, - MutableMapping, MutableSequence, Optional, Tuple, @@ -23,9 +16,7 @@ from prov.identifier import Identifier from prov.model import PROV, PROV_LABEL, PROV_TYPE, PROV_VALUE, ProvDocument, ProvEntity -from schema_salad.sourceline import SourceLine -from typing_extensions import TYPE_CHECKING -from tools.load_ga_export import load_ga_history_export, GalaxyJob, GalaxyDataset +from tools.load_ga_export import load_ga_history_export, GalaxyJob from ast import literal_eval import os @@ -36,16 +27,12 @@ from rocrate.provenance_constants import ( ACCOUNT_UUID, CWLPROV, - ENCODING, - FOAF, METADATA, ORE, PROVENANCE, RO, SCHEMA, SHA1, - SHA256, - TEXT_PLAIN, UUID, WF4EVER, WFDESC, @@ -59,15 +46,17 @@ # from rocrate.provenance import ResearchObject from pathlib import Path -import rocrate.rocrate as roc + def posix_path(local_path: str) -> str: return str(PurePosixPath(Path(local_path))) + def remove_escapes(s): escapes = ''.join([chr(char) for char in range(1, 32)]) translator = str.maketrans('', '', escapes) - t = s.translate(translator) + s.translate(translator) + def reassign(d): for k, v in d.items(): @@ -78,8 +67,9 @@ def reassign(d): except ValueError: pass + class ProvenanceProfile: - """ + """\ Provenance profile. Populated from a galaxy workflow export. @@ -87,7 +77,7 @@ class ProvenanceProfile: def __init__( self, - ga_export: Dict, + ga_export: Dict, full_name: str = None, orcid: str = None, # prov_name: str = None, @@ -112,12 +102,11 @@ def __init__( self.base_uri = "arcp://uuid,%s/" % self.ro_uuid self.document = ProvDocument() # TODO extract engine_uuid from galaxy, type: str - self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() #type: str + self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() # type: str self.full_name = full_name self.workflow_run_uuid = run_uuid or uuid.uuid4() self.workflow_run_uri = self.workflow_run_uuid.urn # type: str - - # move to separate function + # move to separate function metadata_export = load_ga_history_export(ga_export) self.generate_prov_doc() self.jobs = [] @@ -143,7 +132,7 @@ def generate_prov_doc(self) -> Tuple[str, ProvDocument]: # PROV_TYPE: FOAF["OnlineAccount"], # TODO: change how we register galaxy version, probably a declare_version func # self.galaxy_version = self.ga_export["jobs_attrs"][0]["galaxy_version"] - # TODO: change notation to already imported namespaces? + # TODO: change notation to already imported namespaces? self.document.add_namespace("wfprov", "http://purl.org/wf4ever/wfprov#") # document.add_namespace('prov', 'http://www.w3.org/ns/prov#') self.document.add_namespace("wfdesc", "http://purl.org/wf4ever/wfdesc#") @@ -166,7 +155,7 @@ def generate_prov_doc(self) -> Tuple[str, ProvDocument]: "provenance", self.base_uri + posix_path(PROVENANCE) + "/" ) # TODO: use appropriate refs for ga_export and related inputs - ro_identifier_workflow = self.base_uri + "ga_export" + "/" + ro_identifier_workflow = self.base_uri + "ga_export" + "/" self.wf_ns = self.document.add_namespace("wf", ro_identifier_workflow) ro_identifier_input = ( self.base_uri + "ga_export/datasets#" @@ -230,15 +219,15 @@ def declare_process( """Record the start of each Process.""" if process_run_id is None: process_run_id = uuid.uuid4().urn - - cmd = ga_export_jobs_attrs["command_line"] + + # cmd = ga_export_jobs_attrs["command_line"] process_name = ga_export_jobs_attrs["tool_id"] - tool_version = ga_export_jobs_attrs["tool_version"] + # tool_version = ga_export_jobs_attrs["tool_version"] prov_label = "Run of ga_export/jobs_attrs.txt#" + process_name start_time = ga_export_jobs_attrs["create_time"] end_time = ga_export_jobs_attrs["update_time"] - #TODO: Find out how to include commandline as a string + # TODO: Find out how to include commandline as a string # cmd = self.document.entity( # uuid.uuid4().urn, # {PROV_TYPE: WFPROV["Artifact"], PROV_LABEL: ga_export_jobs_attrs["command_line"]} @@ -249,9 +238,9 @@ def declare_process( start_time, end_time, { - PROV_TYPE: WFPROV["ProcessRun"], - PROV_LABEL: prov_label, - #TODO: Find out how to include commandline as a string + PROV_TYPE: WFPROV["ProcessRun"], + PROV_LABEL: prov_label, + # TODO: Find out how to include commandline as a string # PROV_LABEL: cmd }, ) @@ -279,7 +268,7 @@ def used_artefacts( base += "/" + process_name tool_id = process_metadata["tool_id"] base += "/" + tool_id - items = ["inputs","outputs","parameters"] + items = ["inputs", "outputs", "parameters"] # print(process_metadata["params"]) for item in items: # print(item) @@ -293,8 +282,8 @@ def used_artefacts( value = json.loads(value) if isinstance(key, str): key = key.replace("|", "_") - if isinstance(value, str): - val = value.replace("|", "_") + if isinstance(value, str): + value = value.replace("|", "_") prov_role = self.wf_ns[f"{base}/{key}"] @@ -307,7 +296,6 @@ def used_artefacts( # for artefact in value: try: - # pdb.set_trace() entity = self.declare_artefact(value) self.document.used( process_run_id, @@ -346,7 +334,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: # byte_s = BytesIO(value) # data_file = self.research_object.add_data_file(byte_s) # FIXME: Don't naively assume add_data_file uses hash in filename! - data_id = "data:%s" % str(value) #PurePosixPath(data_file).stem + data_id = "data:%s" % str(value) # PurePosixPath(data_file).stem return self.document.entity( data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)}, @@ -383,7 +371,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: ) if value.get("class"): - #_logger.warning("Unknown data class %s.", value["class"]) + # _logger.warning("Unknown data class %s.", value["class"]) # FIXME: The class might be "http://example.com/somethingelse" coll.add_asserted_type(CWLPROV[value["class"]]) @@ -393,7 +381,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: # clean up unwanted characters if isinstance(key, str): key = key.replace("|", "_") - if isinstance(val, str): + if isinstance(val, str): val = val.replace("|", "_") v_ent = self.declare_artefact(val) @@ -440,7 +428,7 @@ def declare_artefact(self, value: Any) -> ProvEntity: # FIXME: list value does not support adding "@id" return coll except TypeError: - #_logger.warning("Unrecognized type %s of %r", type(value), value) + # _logger.warning("Unrecognized type %s of %r", type(value), value) # Let's just fall back to Python repr() entity = self.document.entity(uuid.uuid4().urn, {PROV_LABEL: repr(value)}) # self.research_object.add_uri(entity.identifier.uri) @@ -455,7 +443,7 @@ def declare_file(self, value: Dict) -> Tuple[ProvEntity, ProvEntity, str]: if "checksum" in value: csum = cast(str, value["checksum"]) (method, checksum) = csum.split("$", 1) - if method == SHA1: # and self.research_object.has_data_file(checksum): + if method == SHA1: # and self.research_object.has_data_file(checksum): entity = self.document.entity("data:" + checksum) if not entity and "location" in value: @@ -502,8 +490,8 @@ def declare_file(self, value: Dict) -> Tuple[ProvEntity, ProvEntity, str]: # Check for secondaries for sec in cast( - # MutableSequence[CWLObjectType], - value.get("secondaryFiles", []) + # MutableSequence[CWLObjectType], + value.get("secondaryFiles", []) # noqa ): # TODO: Record these in a specializationOf entity with UUID? if sec["class"] == "File": @@ -524,8 +512,10 @@ def declare_file(self, value: Dict) -> Tuple[ProvEntity, ProvEntity, str]: return file_entity, entity, checksum - def declare_directory(self - # , value: CWLObjectType + def declare_directory( + self, + # value: CWLObjectType + value ) -> ProvEntity: """Register any nested files/directories.""" # FIXME: Calculate a hash-like identifier for directory @@ -636,12 +626,11 @@ def declare_string(self, value: str) -> Tuple[ProvEntity, str]: # checksum = PurePosixPath(data_file).name # FIXME: Don't naively assume add_data_file uses hash in filename! value = str(value).replace("|", "_") - data_id = "data:%s" % str(value) #PurePosixPath(data_file).stem + data_id = "data:%s" % str(value) # PurePosixPath(data_file).stem entity = self.document.entity( data_id, {PROV_TYPE: WFPROV["Artifact"], PROV_VALUE: str(value)} ) # type: ProvEntity - return entity #, checksum - + return entity # , checksum def generate_output_prov( self, @@ -724,7 +713,7 @@ def activity_has_provenance(self, activity, prov_ids): self.document.activity(activity, other_attributes=attribs) # Tip: we can't use https://www.w3.org/TR/prov-links/#term-mention # as prov:mentionOf() is only for entities, not activities - uris = [i.uri for i in prov_ids] + # uris = [i.uri for i in prov_ids] # self.research_object.add_annotation(activity, uris, PROV["has_provenance"].uri) def finalize_prov_profile(self, name=None, out_path=None): @@ -759,7 +748,7 @@ def finalize_prov_profile(self, name=None, out_path=None): # https://www.w3.org/TR/prov-xml/ # serialized_prov_docs["xml"] = self.document.serialize(format="xml", indent=4) - prov_ids.append(self.provenance_ns[filename + ".xml"]) + prov_ids.append(self.provenance_ns[filename + ".xml"]) with open(basename + ".xml", "w") as provenance_file: self.document.serialize(provenance_file, format="xml", indent=4) @@ -768,7 +757,6 @@ def finalize_prov_profile(self, name=None, out_path=None): prov_ids.append(self.provenance_ns[filename + ".provn"]) with open(basename + ".provn", "w") as provenance_file: self.document.serialize(provenance_file, format="provn", indent=2) - # https://www.w3.org/Submission/prov-json/ # serialized_prov_docs["json"] = self.document.serialize(format="json", indent=2) @@ -799,7 +787,6 @@ def finalize_prov_profile(self, name=None, out_path=None): prov_ids.append(self.provenance_ns[filename + ".jsonld"]) with open(basename + ".jsonld", "w") as provenance_file: self.document.serialize(provenance_file, format="rdf", rdf_format="json-ld") - - #_logger.debug("[provenance] added provenance: %s", prov_ids) + # _logger.debug("[provenance] added provenance: %s", prov_ids) return (serialized_prov_docs, prov_ids) diff --git a/rocrate/rocrate_api.py b/rocrate/rocrate_api.py index f6f912c..3243a6b 100644 --- a/rocrate/rocrate_api.py +++ b/rocrate/rocrate_api.py @@ -18,7 +18,6 @@ # limitations under the License. from pathlib import Path -import os import rocrate.rocrate as roc from rocrate.provenance_profile import ProvenanceProfile @@ -79,6 +78,7 @@ def make_workflow_rocrate(workflow_path, wf_type, include_files=[], return wf_crate + # WIP def make_workflow_run_rocrate(workflow_path, wf_type, wfr_metadata_path, author=None, orcid=None, include_files=[], fetch_remote=False, prov_name=None, prov_path=None, cwl=None, diagram=None): @@ -110,4 +110,4 @@ def make_workflow_run_rocrate(workflow_path, wf_type, wfr_metadata_path, author= for file_entry in include_files: wfr_crate.add_file(file_entry) - return wfr_crate \ No newline at end of file + return wfr_crate diff --git a/test/test_parse_ga_export.py b/test/test_parse_ga_export.py index 6ac1256..08120be 100644 --- a/test/test_parse_ga_export.py +++ b/test/test_parse_ga_export.py @@ -1,41 +1,34 @@ -import sys import os -import pytest -from typing import ( - Tuple, -) -from prov.model import ProvDocument -sys.path.append('./ro-crate-py') from rocrate.rocrate import ROCrate from rocrate import rocrate_api as roc_api -from rocrate.model.computerlanguage import CWL_DEFAULT_VERSION, GALAXY_DEFAULT_VERSION from rocrate.provenance_profile import ProvenanceProfile -WF_CRATE = "https://w3id.org/workflowhub/workflow-ro-crate" -from tools.load_ga_export import load_ga_history_export, GalaxyJob, GalaxyDataset +from tools.load_ga_export import load_ga_history_export, GalaxyJob + def test_ga_history_loading(test_data_dir, tmpdir, helpers): export_dir = "test_ga_history_export" export_path = test_data_dir / export_dir / "history_export" - + metadata_export = load_ga_history_export(export_path) jobs = [] for job in metadata_export["jobs_attrs"]: job_attrs = GalaxyJob() job_attrs.parse_ga_jobs_attrs(job) jobs.append(job_attrs.attributes) - + assert isinstance(job_attrs, GalaxyJob) # print(jobs[0]) assert len(jobs) == 4 + def test_ga_history_parsing(test_data_dir, tmpdir, helpers): export_dir = "test_ga_history_export" export_path = test_data_dir / export_dir / "history_export" - prov_path = "provenance" + prov_path = tmpdir / "provenance" # prov_name = "ga_export.cwlprov" # crate_path = test_data_dir / export_dir / "history_export_crate" - + # metadata_export = load_ga_history_export(export_path) prov = ProvenanceProfile(export_path, "PDG", "https://orcid.org/0000-0002-8940-4946") @@ -49,14 +42,13 @@ def test_ga_history_parsing(test_data_dir, tmpdir, helpers): # print(serialized_prov_docs.keys()) - def test_create_wf_run_ro_crate(test_data_dir, tmpdir, helpers): # wf_path = base_path + "example-history-export3.ga" # dataset_path = base_path + "example-history-export3/datasets/" # wfr_metadata_path = base_path + "example-history-export3" # files_list = os.listdir(dataset_path) # files_list = [dataset_path + f for f in files_list] - + export_dir = "test_ga_history_export" wfr_metadata_path = test_data_dir / export_dir / "history_export" dataset_path = wfr_metadata_path / "datasets" @@ -66,9 +58,10 @@ def test_create_wf_run_ro_crate(test_data_dir, tmpdir, helpers): wf_path = test_data_dir / export_dir / wf_id # wf_crate = roc_api.make_workflow_rocrate(wf_path, wf_type='Galaxy') - wf_crate = roc_api.make_workflow_run_rocrate(workflow_path=wf_path, - wfr_metadata_path=wfr_metadata_path, author=None, orcid=None, - wf_type="Galaxy",include_files=files_list, prov_name="test_prov") + wf_crate = roc_api.make_workflow_run_rocrate( + workflow_path=wf_path, wfr_metadata_path=wfr_metadata_path, author=None, orcid=None, + wf_type="Galaxy", include_files=files_list, prov_name="test_prov" + ) assert isinstance(wf_crate, ROCrate) # wf = wf_crate.dereference(wf_id) @@ -77,4 +70,3 @@ def test_create_wf_run_ro_crate(test_data_dir, tmpdir, helpers): if not os.path.exists(out_path): out_path.mkdir() wf_crate.write(out_path) - \ No newline at end of file diff --git a/tools/load_ga_export.py b/tools/load_ga_export.py index b608c7f..6bb51dd 100644 --- a/tools/load_ga_export.py +++ b/tools/load_ga_export.py @@ -2,35 +2,23 @@ import os import re from typing import ( - IO, - Any, - Callable, Dict, - Generator, - Iterable, - List, - MutableMapping, - MutableSequence, - NamedTuple, - Optional, - Set, - Tuple, - Union, - cast, ) + def load_ga_history_export(export_dir): fn_list = os.listdir(export_dir) export_metadata = {} - for f in fn_list : - export_dir_path = os.path.join(export_dir,f) + for f in fn_list: + export_dir_path = os.path.join(export_dir, f) if os.path.isfile(export_dir_path): - with open(export_dir_path,"r") as fh: + with open(export_dir_path, "r") as fh: # create keys for metadata files, removes '.' and 'txt' from fn - key = '_'.join(list(filter(None, re.split('\.|txt',f)))) + key = '_'.join(list(filter(None, re.split(r'\.|txt', f)))) export_metadata[key] = json.loads(fh.read()) return export_metadata + class GalaxyJob(Dict): def __init__(self): """ @@ -57,7 +45,9 @@ def parse_ga_jobs_attrs(self, job_attrs): if "params" in key: self.attributes["parameters"].update(job_attrs[key]) + class GalaxyDataset(Dict): + def __init__(self, ga_export_dataset_attrs): """ Initialize the GalaxyDataset object. @@ -75,4 +65,4 @@ def parse_ga_dataset_attrs(self, job_attrs): pass else: if "metadata" in key: - self.attributes["metadata"].update(job_attrs[key]) \ No newline at end of file + self.attributes["metadata"].update(job_attrs[key])