Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement run/test/workflow_edit/autoupdate from non default tool sheds #1445

Merged
42 changes: 19 additions & 23 deletions planemo/autoupdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

import requests
import yaml
from bioblend import toolshed
from bioblend.toolshed import ToolShedInstance
from galaxy.tool_util.deps import conda_util
from galaxy.tool_util.version import parse_version

import planemo.conda
from planemo.galaxy.workflows import (
get_tool_ids_for_workflow,
get_toolshed_url_for_tool_id,
MAIN_TOOLSHED_URL,
)
from planemo.io import (
error,
info,
)
from planemo.workflow_lint import (
find_repos_from_tool_id,
MAIN_TOOLSHED_URL,
)
from planemo.workflow_lint import find_repos_from_tool_id

if TYPE_CHECKING:
from planemo.cli import PlanemoCliContext
Expand Down Expand Up @@ -294,7 +295,7 @@ def get_newest_tool_id(tool_ids: List[str]) -> str:


def outdated_tools( # noqa: C901
ctx: "PlanemoCliContext", wf_dict: Dict[str, Any], ts: ToolShedInstance, tools_to_skip: List[str]
ctx: "PlanemoCliContext", wf_dict: Dict[str, Any], tools_to_skip: List[str]
) -> Dict[str, Dict[str, str]]:
"""
tools_to_skip should be a list of base tool ids.
Expand All @@ -305,8 +306,12 @@ def base_tool_id(tool_id: str) -> str:

def check_tool_step(tool_id: str) -> Dict[str, Dict[str, str]]:
"""
Return a dict with current and newest tool version, in case they don't match
Return a dict with current and newest tool version, in case they don't match.
"""
tool_shed_url = get_toolshed_url_for_tool_id(tool_id)
if not tool_shed_url:
return {}
ts = ToolShedInstance(tool_shed_url)
warning_msg, repos = find_repos_from_tool_id(tool_id, ts)
if warning_msg != "":
ctx.log(warning_msg)
Expand All @@ -328,20 +333,12 @@ def check_tool_step(tool_id: str) -> Dict[str, Dict[str, str]]:
return {}

def outdated_tools_rec(wf_dict: Dict[str, Any]) -> None:
steps = wf_dict["steps"].values() if isinstance(wf_dict["steps"], dict) else wf_dict["steps"]
for step in steps:
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
tool_id = step["tool_id"]
base_id = base_tool_id(tool_id)
if base_id not in checked_tools:
outdated_tool_dict.update(check_tool_step(tool_id))
checked_tools.append(base_id)
elif step.get("type") == "subworkflow": # GA SWF
outdated_tools_rec(step["subworkflow"])
elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF
outdated_tools_rec(step["run"])
else:
continue
tool_ids = get_tool_ids_for_workflow(wf_dict)
for tool_id in tool_ids:
base_id = base_tool_id(tool_id)
if base_id not in checked_tools:
outdated_tool_dict.update(check_tool_step(tool_id))
checked_tools.append(base_id)

outdated_tool_dict: Dict[str, Dict[str, str]] = {}
# Initialize the list of tools already checked with a copy of tools_to_skip
Expand All @@ -359,8 +356,7 @@ def get_tools_to_update(
with open(workflow.path) as f:
wf_dict = yaml.load(f, Loader=yaml.SafeLoader)

ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
return outdated_tools(ctx, wf_dict, ts, tools_to_skip)
return outdated_tools(ctx, wf_dict, tools_to_skip)


def autoupdate_wf(ctx: "PlanemoCliContext", config: "LocalGalaxyConfig", wf: "Runnable") -> Dict[str, Any]:
Expand Down
73 changes: 55 additions & 18 deletions planemo/galaxy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
from planemo.config import OptionSource
from planemo.deps import ensure_dependency_resolvers_conf_configured
from planemo.docker import docker_host_args
from planemo.galaxy.workflows import remote_runnable_to_workflow_id
from planemo.galaxy.workflows import (
get_toolshed_url_for_tool_id,
remote_runnable_to_workflow_id,
)
from planemo.io import (
communicate,
kill_pid_file,
Expand All @@ -48,6 +51,7 @@
write_file,
)
from planemo.mulled import build_involucro_context
from planemo.runnable import RunnableType
from planemo.shed import tool_shed_url
from .api import (
DEFAULT_ADMIN_API_KEY,
Expand All @@ -60,6 +64,7 @@
find_tool_ids,
import_workflow,
install_shed_repos,
MAIN_TOOLSHED_URL,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -258,7 +263,7 @@ def config_join(*args):
shed_tool_path = kwds.get("shed_tool_path") or config_join("shed_tools")
_ensure_directory(shed_tool_path)

sheds_config_path = _configure_sheds_config_file(ctx, config_directory, **kwds)
sheds_config_path = _configure_sheds_config_file(ctx, config_directory, runnables, **kwds)
port = _get_port(kwds)
properties = _shared_galaxy_properties(config_directory, kwds, for_tests=for_tests)
_handle_container_resolution(ctx, kwds, properties)
Expand Down Expand Up @@ -389,7 +394,7 @@ def config_join(*args):
shed_tool_path = kwds.get("shed_tool_path") or config_join("shed_tools")
_ensure_directory(shed_tool_path)

sheds_config_path = _configure_sheds_config_file(ctx, config_directory, **kwds)
sheds_config_path = _configure_sheds_config_file(ctx, config_directory, runnables, **kwds)

database_location = config_join("galaxy.sqlite")
master_api_key = _get_master_api_key(kwds)
Expand Down Expand Up @@ -568,19 +573,27 @@ def _all_tool_paths(
all_tool_paths = {r.path for r in runnables if r.has_tools and not r.data_manager_conf_path}
extra_tools = _expand_paths(galaxy_root, extra_tools=extra_tools)
all_tool_paths.update(extra_tools)
for runnable in runnables:
if runnable.type.name == "galaxy_workflow":
tool_ids = find_tool_ids(runnable.path)
for tool_id in tool_ids:
tool_paths = DISTRO_TOOLS_ID_TO_PATH.get(tool_id)
if tool_paths:
if isinstance(tool_paths, str):
tool_paths = [tool_paths]
all_tool_paths.update(tool_paths)
for tool_id in get_tool_ids_for_runnables(runnables):
tool_paths = DISTRO_TOOLS_ID_TO_PATH.get(tool_id)
if tool_paths:
if isinstance(tool_paths, str):
tool_paths = [tool_paths]
all_tool_paths.update(tool_paths)

return all_tool_paths


def get_workflow_runnables(runnables: List["Runnable"]) -> List["Runnable"]:
return [r for r in runnables if r.type == RunnableType.galaxy_workflow and r.has_path]


def get_tool_ids_for_runnables(runnables) -> List[str]:
tool_ids = []
for r in get_workflow_runnables(runnables):
tool_ids.extend(find_tool_ids(r.path))
return list(dict.fromkeys(tool_ids))


def _shared_galaxy_properties(config_directory, kwds, for_tests):
"""Setup properties useful for local and Docker Galaxy instances.

Expand Down Expand Up @@ -1201,12 +1214,36 @@ def _search_tool_path_for(path, target, extra_paths=None):
return None


def _configure_sheds_config_file(ctx, config_directory, **kwds):
if "shed_target" not in kwds:
kwds = kwds.copy()
kwds["shed_target"] = "toolshed"
shed_target_url = tool_shed_url(ctx, **kwds)
contents = _sub(TOOL_SHEDS_CONF, {"shed_target_url": shed_target_url})
def get_tool_sheds_conf_for_runnables(runnables: Optional[List["Runnable"]]) -> Optional[str]:
if runnables:
tool_ids = get_tool_ids_for_runnables(runnables)
return get_shed_tools_conf_string_for_tool_ids(tool_ids)
return None


def get_shed_tools_conf_string_for_tool_ids(tool_ids: List[str]) -> str:
tool_shed_urls = set(get_toolshed_url_for_tool_id(tool_id) for tool_id in tool_ids if tool_id)
# always add main toolshed
tool_shed_urls.add(MAIN_TOOLSHED_URL)
cleaned_tool_shed_urls = set(_ for _ in tool_shed_urls if _ is not None)
TOOL_SHEDS_CONF_TEMPLATE = Template("""<tool_sheds>${tool_shed_lines}</tool_sheds>""")
tool_sheds: List[str] = []
# sort tool_shed_urls from shortest to longest, as https://github.com/galaxyproject/galaxy/blob/c7cb47a1b18ccd5b39075a705bbd2f34572755fe/lib/galaxy/util/tool_shed/tool_shed_registry.py#L106-L118
# has a bug where a toolshed that is an exact substring of another registered toolshed would wrongly be selected.
for shed_url in sorted(cleaned_tool_shed_urls, key=lambda url: len(url)):
tool_sheds.append(f'<tool_shed name="{shed_url.split("://")[-1]}" url="{shed_url}" />')
return TOOL_SHEDS_CONF_TEMPLATE.substitute(tool_shed_lines="".join(tool_sheds))


def _configure_sheds_config_file(ctx, config_directory, runnables, **kwds):
# Find tool sheds to add to config
contents = get_tool_sheds_conf_for_runnables(runnables)
if not contents:
if "shed_target" not in kwds:
kwds = kwds.copy()
kwds["shed_target"] = "toolshed"
shed_target_url = tool_shed_url(ctx, **kwds)
contents = _sub(TOOL_SHEDS_CONF, {"shed_target_url": shed_target_url})
tool_sheds_conf = os.path.join(config_directory, "tool_sheds_conf.xml")
write_file(tool_sheds_conf, contents)
return tool_sheds_conf
Expand Down
67 changes: 55 additions & 12 deletions planemo/galaxy/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import json
import os
from collections import namedtuple
from functools import lru_cache
from typing import (
Any,
Callable,
Dict,
List,
Optional,
)
from urllib.parse import urlparse

import requests
import yaml
from ephemeris import (
generate_tool_list_from_ga_workflow_files,
Expand All @@ -35,6 +38,35 @@
FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."
GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/"
GALAXY_WORKFLOW_INSTANCE_PREFIX = "gxid://workflow-instance/"
MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


@lru_cache(maxsize=None)
def guess_tool_shed_url(tool_shed_fqdn: str) -> Optional[str]:
if tool_shed_fqdn in MAIN_TOOLSHED_URL:
return MAIN_TOOLSHED_URL
else:
# guess if tool shed is served over https or http
https_tool_shed_url = f"https://{tool_shed_fqdn}"
r = requests.get(https_tool_shed_url)
if r.status_code == 200:
return https_tool_shed_url
else:
http_tool_shed_url = f"http://{tool_shed_fqdn}"
r = requests.get(http_tool_shed_url)
if r.status_code == 200:
return http_tool_shed_url
else:
warn(f"Could not connect to {tool_shed_fqdn}")
return None


def get_toolshed_url_for_tool_id(tool_id: str) -> Optional[str]:
components = tool_id.split("/repos")
if len(components) > 1:
tool_shed_fqdn = components[0]
return guess_tool_shed_url(tool_shed_fqdn=tool_shed_fqdn)
return None


def load_shed_repos(runnable):
Expand Down Expand Up @@ -62,6 +94,12 @@ def load_shed_repos(runnable):
if repository:
repository["tool_panel_section_label"] = "Tools from workflows"
tools.append(repository)
for repo in tools:
tool_shed = repo.get("tool_shed")
if tool_shed:
tool_shed_url = guess_tool_shed_url(tool_shed)
if tool_shed_url:
repo["tool_shed_url"] = tool_shed_url
return tools


Expand Down Expand Up @@ -134,20 +172,25 @@ def _raw_dict(path, importer=None):
return workflow


def find_tool_ids(path):
tool_ids = set()
workflow = _raw_dict(path)

def register_tool_ids(tool_ids, workflow):
for step in workflow["steps"].values():
if step.get("subworkflow"):
register_tool_ids(tool_ids, step["subworkflow"])
elif step.get("tool_id"):
tool_ids.add(step["tool_id"])
def get_tool_ids_for_workflow(wf_dict: Dict[str, Any], tool_ids: Optional[List[str]] = None) -> List[str]:
tool_ids = [] if tool_ids is None else tool_ids
steps = wf_dict["steps"].values() if isinstance(wf_dict["steps"], dict) else wf_dict["steps"]
for step in steps:
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
tool_id = step["tool_id"]
tool_ids.append(tool_id)
elif step.get("type") == "subworkflow": # GA SWF
get_tool_ids_for_workflow(step["subworkflow"], tool_ids=tool_ids)
elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF
get_tool_ids_for_workflow(step["run"], tool_ids=tool_ids)
else:
continue
return list(dict.fromkeys(tool_ids))

register_tool_ids(tool_ids, workflow)

return list(tool_ids)
def find_tool_ids(path):
workflow = _raw_dict(path)
return get_tool_ids_for_workflow(workflow)


WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label", "optional"])
Expand Down
34 changes: 25 additions & 9 deletions planemo/workflow_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from planemo.galaxy.workflows import (
input_labels,
MAIN_TOOLSHED_URL,
output_labels,
required_input_labels,
)
Expand All @@ -51,8 +52,6 @@
POTENTIAL_WORKFLOW_FILES = re.compile(r"^.*(\.yml|\.yaml|\.ga)$")
DOCKSTORE_REGISTRY_CONF_VERSION = "1.2"

MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


class WorkflowLintContext(LintContext):
# Setup training topic for linting - probably should pass this through
Expand Down Expand Up @@ -479,27 +478,44 @@ def find_repos_from_tool_id(tool_id: str, ts: ToolShedInstance) -> Tuple[str, Di
"""
Return a string which indicates what failed and dict with all revisions for a given tool id
"""
if not tool_id.startswith(MAIN_TOOLSHED_URL[8:]):
if "/repos" not in tool_id:
return ("", {}) # assume a built in tool
*_, owner, name, _tool_id, _version = tool_id.split("/")

try:
repos = ts.repositories._get(params={"tool_ids": tool_id})
except Exception:
return (f"The ToolShed returned an error when searching for the most recent version of {tool_id}", {})
repo = ts.repositories.get_repositories(name, owner)[0]
repos = ts.repositories._get(url=f'{ts.repositories._make_url()}/{repo["id"]}/metadata')
except Exception as e:
return (f"The ToolShed returned an error when searching for the most recent version of {tool_id}: {e}", {})
if len(repos) == 0:
return (f"The tool {tool_id} is not in the toolshed (may have been tagged as invalid).", {})
else:
return ("", repos)


def assert_valid_tool_id_in_tool_shed(tool_id: str, ts: ToolShedInstance) -> Optional[str]:
if "/repos" not in tool_id:
return None
warning_msg, repos = find_repos_from_tool_id(tool_id, ts)
if warning_msg:
return warning_msg
for repo in repos.values():
tools = repo.get("tools", [])
for tool in tools:
if tool_id == tool.get("guid"):
return None
return f"The tool {tool_id} is not in the toolshed (may have been tagged as invalid)."


def _lint_tool_ids(path: str, lint_context: WorkflowLintContext) -> None:
def _lint_tool_ids_steps(lint_context: WorkflowLintContext, wf_dict: Dict, ts: ToolShedInstance) -> bool:
"""Returns whether a single tool_id was invalid"""
failed = False
steps = wf_dict.get("steps", {})
for step in steps.values():
if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow":
warning_msg, _ = find_repos_from_tool_id(step["tool_id"], ts)
if warning_msg != "":
warning_msg = assert_valid_tool_id_in_tool_shed(step["tool_id"], ts)
if warning_msg:
lint_context.error(warning_msg)
failed = True
elif step.get("type") == "subworkflow": # GA SWF
Expand All @@ -519,5 +535,5 @@ def _lint_tool_ids_steps(lint_context: WorkflowLintContext, wf_dict: Dict, ts: T
ts = toolshed.ToolShedInstance(url=MAIN_TOOLSHED_URL)
failed = _lint_tool_ids_steps(lint_context, workflow_dict, ts)
if not failed:
lint_context.valid("All tools_id appear to be valid.")
lint_context.valid("All tool ids appear to be valid.")
return None
Loading