Skip to content

Commit

Permalink
Merge pull request #1445 from mvdbeek/implement_autoupdate_from_non_d…
Browse files Browse the repository at this point in the history
…efault_tool_sheds

Implement run/test/workflow_edit/autoupdate from non default tool sheds
  • Loading branch information
mvdbeek authored May 7, 2024
2 parents ece1fed + 6ad6e52 commit fcabbd6
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 62 deletions.
42 changes: 19 additions & 23 deletions planemo/autoupdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

import requests
import yaml
from bioblend import toolshed
from bioblend.toolshed import ToolShedInstance
from galaxy.tool_util.deps import conda_util
from galaxy.tool_util.version import parse_version

import planemo.conda
from planemo.galaxy.workflows import (
get_tool_ids_for_workflow,
get_toolshed_url_for_tool_id,
MAIN_TOOLSHED_URL,
)
from planemo.io import (
error,
info,
)
from planemo.workflow_lint import (
find_repos_from_tool_id,
MAIN_TOOLSHED_URL,
)
from planemo.workflow_lint import find_repos_from_tool_id

if TYPE_CHECKING:
from planemo.cli import PlanemoCliContext
Expand Down Expand Up @@ -294,7 +295,7 @@ def get_newest_tool_id(tool_ids: List[str]) -> str:


def outdated_tools( # noqa: C901
ctx: "PlanemoCliContext", wf_dict: Dict[str, Any], ts: ToolShedInstance, tools_to_skip: List[str]
ctx: "PlanemoCliContext", wf_dict: Dict[str, Any], tools_to_skip: List[str]
) -> Dict[str, Dict[str, str]]:
"""
tools_to_skip should be a list of base tool ids.
Expand All @@ -305,8 +306,12 @@ def base_tool_id(tool_id: str) -> str:

def check_tool_step(tool_id: str) -> Dict[str, Dict[str, str]]:
"""
Return a dict with current and newest tool version, in case they don't match
Return a dict with current and newest tool version, in case they don't match.
"""
tool_shed_url = get_toolshed_url_for_tool_id(tool_id)
if not tool_shed_url:
return {}
ts = ToolShedInstance(tool_shed_url)
warning_msg, repos = find_repos_from_tool_id(tool_id, ts)
if warning_msg != "":
ctx.log(warning_msg)
Expand All @@ -328,20 +333,12 @@ def check_tool_step(tool_id: str) -> Dict[str, Dict[str, str]]:
return {}

def outdated_tools_rec(wf_dict: Dict[str, Any]) -> None:
steps = wf_dict["steps"].values() if isinstance(wf_dict["steps"], dict) else wf_dict["steps"]
for step in steps:
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
tool_id = step["tool_id"]
base_id = base_tool_id(tool_id)
if base_id not in checked_tools:
outdated_tool_dict.update(check_tool_step(tool_id))
checked_tools.append(base_id)
elif step.get("type") == "subworkflow": # GA SWF
outdated_tools_rec(step["subworkflow"])
elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF
outdated_tools_rec(step["run"])
else:
continue
tool_ids = get_tool_ids_for_workflow(wf_dict)
for tool_id in tool_ids:
base_id = base_tool_id(tool_id)
if base_id not in checked_tools:
outdated_tool_dict.update(check_tool_step(tool_id))
checked_tools.append(base_id)

outdated_tool_dict: Dict[str, Dict[str, str]] = {}
# Initialize the list of tools already checked with a copy of tools_to_skip
Expand All @@ -359,8 +356,7 @@ def get_tools_to_update(
with open(workflow.path) as f:
wf_dict = yaml.load(f, Loader=yaml.SafeLoader)

ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
return outdated_tools(ctx, wf_dict, ts, tools_to_skip)
return outdated_tools(ctx, wf_dict, tools_to_skip)


def autoupdate_wf(ctx: "PlanemoCliContext", config: "LocalGalaxyConfig", wf: "Runnable") -> Dict[str, Any]:
Expand Down
73 changes: 55 additions & 18 deletions planemo/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
from planemo.config import OptionSource
from planemo.deps import ensure_dependency_resolvers_conf_configured
from planemo.docker import docker_host_args
from planemo.galaxy.workflows import remote_runnable_to_workflow_id
from planemo.galaxy.workflows import (
get_toolshed_url_for_tool_id,
remote_runnable_to_workflow_id,
)
from planemo.io import (
communicate,
kill_pid_file,
Expand All @@ -48,6 +51,7 @@
write_file,
)
from planemo.mulled import build_involucro_context
from planemo.runnable import RunnableType
from planemo.shed import tool_shed_url
from .api import (
DEFAULT_ADMIN_API_KEY,
Expand All @@ -60,6 +64,7 @@
find_tool_ids,
import_workflow,
install_shed_repos,
MAIN_TOOLSHED_URL,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -258,7 +263,7 @@ def config_join(*args):
shed_tool_path = kwds.get("shed_tool_path") or config_join("shed_tools")
_ensure_directory(shed_tool_path)

sheds_config_path = _configure_sheds_config_file(ctx, config_directory, **kwds)
sheds_config_path = _configure_sheds_config_file(ctx, config_directory, runnables, **kwds)
port = _get_port(kwds)
properties = _shared_galaxy_properties(config_directory, kwds, for_tests=for_tests)
_handle_container_resolution(ctx, kwds, properties)
Expand Down Expand Up @@ -391,7 +396,7 @@ def config_join(*args):
shed_tool_path = kwds.get("shed_tool_path") or config_join("shed_tools")
_ensure_directory(shed_tool_path)

sheds_config_path = _configure_sheds_config_file(ctx, config_directory, **kwds)
sheds_config_path = _configure_sheds_config_file(ctx, config_directory, runnables, **kwds)

database_location = config_join("galaxy.sqlite")
master_api_key = _get_master_api_key(kwds)
Expand Down Expand Up @@ -570,19 +575,27 @@ def _all_tool_paths(
all_tool_paths = {r.path for r in runnables if r.has_tools and not r.data_manager_conf_path}
extra_tools = _expand_paths(galaxy_root, extra_tools=extra_tools)
all_tool_paths.update(extra_tools)
for runnable in runnables:
if runnable.type.name == "galaxy_workflow":
tool_ids = find_tool_ids(runnable.path)
for tool_id in tool_ids:
tool_paths = DISTRO_TOOLS_ID_TO_PATH.get(tool_id)
if tool_paths:
if isinstance(tool_paths, str):
tool_paths = [tool_paths]
all_tool_paths.update(tool_paths)
for tool_id in get_tool_ids_for_runnables(runnables):
tool_paths = DISTRO_TOOLS_ID_TO_PATH.get(tool_id)
if tool_paths:
if isinstance(tool_paths, str):
tool_paths = [tool_paths]
all_tool_paths.update(tool_paths)

return all_tool_paths


def get_workflow_runnables(runnables: List["Runnable"]) -> List["Runnable"]:
return [r for r in runnables if r.type == RunnableType.galaxy_workflow and r.has_path]


def get_tool_ids_for_runnables(runnables) -> List[str]:
tool_ids = []
for r in get_workflow_runnables(runnables):
tool_ids.extend(find_tool_ids(r.path))
return list(dict.fromkeys(tool_ids))


def _shared_galaxy_properties(config_directory, kwds, for_tests):
"""Setup properties useful for local and Docker Galaxy instances.
Expand Down Expand Up @@ -1203,12 +1216,36 @@ def _search_tool_path_for(path, target, extra_paths=None):
return None


def _configure_sheds_config_file(ctx, config_directory, **kwds):
if "shed_target" not in kwds:
kwds = kwds.copy()
kwds["shed_target"] = "toolshed"
shed_target_url = tool_shed_url(ctx, **kwds)
contents = _sub(TOOL_SHEDS_CONF, {"shed_target_url": shed_target_url})
def get_tool_sheds_conf_for_runnables(runnables: Optional[List["Runnable"]]) -> Optional[str]:
if runnables:
tool_ids = get_tool_ids_for_runnables(runnables)
return get_shed_tools_conf_string_for_tool_ids(tool_ids)
return None


def get_shed_tools_conf_string_for_tool_ids(tool_ids: List[str]) -> str:
tool_shed_urls = set(get_toolshed_url_for_tool_id(tool_id) for tool_id in tool_ids if tool_id)
# always add main toolshed
tool_shed_urls.add(MAIN_TOOLSHED_URL)
cleaned_tool_shed_urls = set(_ for _ in tool_shed_urls if _ is not None)
TOOL_SHEDS_CONF_TEMPLATE = Template("""<tool_sheds>${tool_shed_lines}</tool_sheds>""")
tool_sheds: List[str] = []
# sort tool_shed_urls from shortest to longest, as https://github.com/galaxyproject/galaxy/blob/c7cb47a1b18ccd5b39075a705bbd2f34572755fe/lib/galaxy/util/tool_shed/tool_shed_registry.py#L106-L118
# has a bug where a toolshed that is an exact substring of another registered toolshed would wrongly be selected.
for shed_url in sorted(cleaned_tool_shed_urls, key=lambda url: len(url)):
tool_sheds.append(f'<tool_shed name="{shed_url.split("://")[-1]}" url="{shed_url}" />')
return TOOL_SHEDS_CONF_TEMPLATE.substitute(tool_shed_lines="".join(tool_sheds))


def _configure_sheds_config_file(ctx, config_directory, runnables, **kwds):
# Find tool sheds to add to config
contents = get_tool_sheds_conf_for_runnables(runnables)
if not contents:
if "shed_target" not in kwds:
kwds = kwds.copy()
kwds["shed_target"] = "toolshed"
shed_target_url = tool_shed_url(ctx, **kwds)
contents = _sub(TOOL_SHEDS_CONF, {"shed_target_url": shed_target_url})
tool_sheds_conf = os.path.join(config_directory, "tool_sheds_conf.xml")
write_file(tool_sheds_conf, contents)
return tool_sheds_conf
Expand Down
67 changes: 55 additions & 12 deletions planemo/galaxy/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import json
import os
from collections import namedtuple
from functools import lru_cache
from typing import (
Any,
Callable,
Dict,
List,
Optional,
)
from urllib.parse import urlparse

import requests
import yaml
from ephemeris import (
generate_tool_list_from_ga_workflow_files,
Expand All @@ -35,6 +38,35 @@
FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."
GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/"
GALAXY_WORKFLOW_INSTANCE_PREFIX = "gxid://workflow-instance/"
MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


@lru_cache(maxsize=None)
def guess_tool_shed_url(tool_shed_fqdn: str) -> Optional[str]:
if tool_shed_fqdn in MAIN_TOOLSHED_URL:
return MAIN_TOOLSHED_URL
else:
# guess if tool shed is served over https or http
https_tool_shed_url = f"https://{tool_shed_fqdn}"
r = requests.get(https_tool_shed_url)
if r.status_code == 200:
return https_tool_shed_url
else:
http_tool_shed_url = f"http://{tool_shed_fqdn}"
r = requests.get(http_tool_shed_url)
if r.status_code == 200:
return http_tool_shed_url
else:
warn(f"Could not connect to {tool_shed_fqdn}")
return None


def get_toolshed_url_for_tool_id(tool_id: str) -> Optional[str]:
components = tool_id.split("/repos")
if len(components) > 1:
tool_shed_fqdn = components[0]
return guess_tool_shed_url(tool_shed_fqdn=tool_shed_fqdn)
return None


def load_shed_repos(runnable):
Expand Down Expand Up @@ -62,6 +94,12 @@ def load_shed_repos(runnable):
if repository:
repository["tool_panel_section_label"] = "Tools from workflows"
tools.append(repository)
for repo in tools:
tool_shed = repo.get("tool_shed")
if tool_shed:
tool_shed_url = guess_tool_shed_url(tool_shed)
if tool_shed_url:
repo["tool_shed_url"] = tool_shed_url
return tools


Expand Down Expand Up @@ -134,20 +172,25 @@ def _raw_dict(path, importer=None):
return workflow


def find_tool_ids(path):
tool_ids = set()
workflow = _raw_dict(path)

def register_tool_ids(tool_ids, workflow):
for step in workflow["steps"].values():
if step.get("subworkflow"):
register_tool_ids(tool_ids, step["subworkflow"])
elif step.get("tool_id"):
tool_ids.add(step["tool_id"])
def get_tool_ids_for_workflow(wf_dict: Dict[str, Any], tool_ids: Optional[List[str]] = None) -> List[str]:
tool_ids = [] if tool_ids is None else tool_ids
steps = wf_dict["steps"].values() if isinstance(wf_dict["steps"], dict) else wf_dict["steps"]
for step in steps:
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
tool_id = step["tool_id"]
tool_ids.append(tool_id)
elif step.get("type") == "subworkflow": # GA SWF
get_tool_ids_for_workflow(step["subworkflow"], tool_ids=tool_ids)
elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF
get_tool_ids_for_workflow(step["run"], tool_ids=tool_ids)
else:
continue
return list(dict.fromkeys(tool_ids))

register_tool_ids(tool_ids, workflow)

return list(tool_ids)
def find_tool_ids(path):
workflow = _raw_dict(path)
return get_tool_ids_for_workflow(workflow)


WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label", "optional"])
Expand Down
34 changes: 25 additions & 9 deletions planemo/workflow_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from planemo.galaxy.workflows import (
input_labels,
MAIN_TOOLSHED_URL,
output_labels,
required_input_labels,
)
Expand All @@ -51,8 +52,6 @@
POTENTIAL_WORKFLOW_FILES = re.compile(r"^.*(\.yml|\.yaml|\.ga)$")
DOCKSTORE_REGISTRY_CONF_VERSION = "1.2"

MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


class WorkflowLintContext(LintContext):
# Setup training topic for linting - probably should pass this through
Expand Down Expand Up @@ -479,27 +478,44 @@ def find_repos_from_tool_id(tool_id: str, ts: ToolShedInstance) -> Tuple[str, Di
"""
Return a string which indicates what failed and dict with all revisions for a given tool id
"""
if not tool_id.startswith(MAIN_TOOLSHED_URL[8:]):
if "/repos" not in tool_id:
return ("", {}) # assume a built in tool
*_, owner, name, _tool_id, _version = tool_id.split("/")

try:
repos = ts.repositories._get(params={"tool_ids": tool_id})
except Exception:
return (f"The ToolShed returned an error when searching for the most recent version of {tool_id}", {})
repo = ts.repositories.get_repositories(name, owner)[0]
repos = ts.repositories._get(url=f'{ts.repositories._make_url()}/{repo["id"]}/metadata')
except Exception as e:
return (f"The ToolShed returned an error when searching for the most recent version of {tool_id}: {e}", {})
if len(repos) == 0:
return (f"The tool {tool_id} is not in the toolshed (may have been tagged as invalid).", {})
else:
return ("", repos)


def assert_valid_tool_id_in_tool_shed(tool_id: str, ts: ToolShedInstance) -> Optional[str]:
if "/repos" not in tool_id:
return None
warning_msg, repos = find_repos_from_tool_id(tool_id, ts)
if warning_msg:
return warning_msg
for repo in repos.values():
tools = repo.get("tools", [])
for tool in tools:
if tool_id == tool.get("guid"):
return None
return f"The tool {tool_id} is not in the toolshed (may have been tagged as invalid)."


def _lint_tool_ids(path: str, lint_context: WorkflowLintContext) -> None:
def _lint_tool_ids_steps(lint_context: WorkflowLintContext, wf_dict: Dict, ts: ToolShedInstance) -> bool:
"""Returns whether a single tool_id was invalid"""
failed = False
steps = wf_dict.get("steps", {})
for step in steps.values():
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
warning_msg, _ = find_repos_from_tool_id(step["tool_id"], ts)
if warning_msg != "":
warning_msg = assert_valid_tool_id_in_tool_shed(step["tool_id"], ts)
if warning_msg:
lint_context.error(warning_msg)
failed = True
elif step.get("type") == "subworkflow": # GA SWF
Expand All @@ -519,5 +535,5 @@ def _lint_tool_ids_steps(lint_context: WorkflowLintContext, wf_dict: Dict, ts: T
ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
failed = _lint_tool_ids_steps(lint_context, workflow_dict, ts)
if not failed:
lint_context.valid("All tools_id appear to be valid.")
lint_context.valid("All tool ids appear to be valid.")
return None
Loading

0 comments on commit fcabbd6

Please sign in to comment.