Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #3427 #3428

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions qiita_db/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,9 +1995,20 @@ def graph(self):
qdb.sql_connection.TRN.add(sql, [self.id])
db_edges = qdb.sql_connection.TRN.execute_fetchindex()

# let's track what nodes are actually being used so if they do not
# have an edge we still return them as part of the graph
used_nodes = nodes.copy()
for edge_id, p_id, c_id in db_edges:
e = DefaultWorkflowEdge(edge_id)
g.add_edge(nodes[p_id], nodes[c_id], connections=e)
if p_id in used_nodes:
del used_nodes[p_id]
if c_id in used_nodes:
del used_nodes[c_id]
# adding the missing nodes
for ms in used_nodes:
g.add_node(nodes[ms])

return g

@property
Expand Down
62 changes: 41 additions & 21 deletions qiita_db/test/test_artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,8 @@ def test_descendants_with_jobs(self):
'"phred_offset": "auto"}')
params = qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str)
user = qdb.user.User('[email protected]')
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
user, params, name='Test WF')
qdb.user.User('[email protected]'), params, name='Test WF')
parent = list(wf.graph.nodes())[0]
wf.add(qdb.software.DefaultParameters(10),
connections={parent: {'demultiplexed': 'input_data'}})
Expand Down Expand Up @@ -699,6 +698,8 @@ def setUp(self):

self._clean_up_files.extend([self.fwd, self.rev])

self.user = qdb.user.User('[email protected]')

def tearDown(self):
for f in self._clean_up_files:
if exists(f):
Expand Down Expand Up @@ -1039,7 +1040,7 @@ def test_delete_in_construction_job(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1],
Expand All @@ -1064,7 +1065,7 @@ def test_delete_error_running_job(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
job._set_status('running')
Expand Down Expand Up @@ -1147,7 +1148,7 @@ def test_delete_with_jobs(self):
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
'"phred_offset": ""}' % test.id)
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'),
self.user,
qdb.software.Parameters.load(qdb.software.Command(1),
json_str=json_str))
job._set_status('success')
Expand Down Expand Up @@ -1177,8 +1178,7 @@ def test_being_deleted_by(self):
cmd = qiita_plugin.get_command('delete_artifact')
params = qdb.software.Parameters.load(
cmd, values_dict={'artifact': test.id})
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'), params, True)
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
job._set_status('running')

# verifying that there is a job and is the same than above
Expand All @@ -1189,8 +1189,7 @@ def test_being_deleted_by(self):
self.assertIsNone(test.being_deleted_by)

# now, let's actually remove
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'), params, True)
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
job.submit()
# let's wait for job
wait_for_processing_job(job.id)
Expand All @@ -1207,7 +1206,7 @@ def test_delete_as_output_job(self):
data = {'OTU table': {'filepaths': [(fp, 'biom')],
'artifact_type': 'BIOM'}}
job = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'),
self.user,
qdb.software.Parameters.load(
qdb.software.Command.get_validator('BIOM'),
values_dict={'files': dumps({'biom': [fp]}),
Expand Down Expand Up @@ -1448,29 +1447,50 @@ def test_descendants_with_jobs(self):
data_type="16S")
self.assertEqual(len(a.analysis.artifacts), 3)
# 3. add jobs conencting the new artifact to the other root
# - currently:
# a -> job -> b
# c
# job1 connects b & c
# job2 connects a & c
# - expected:
# a --> job -> b
# |-> job2 -> out
# ^
# |-----|---> job1 -> out
# c ------------|
cmd = qdb.software.Command.create(
qdb.software.Software(1),
"CommandWithMultipleInputs", "", {
'input_b': ['artifact:["BIOM"]', None],
'input_c': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
params = qdb.software.Parameters.load(
cmd, values_dict={'input_b': a.children[0].id, 'input_c': c.id})
job1 = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'), params)
'input_x': ['artifact:["BIOM"]', None],
'input_y': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
params = qdb.software.Parameters.load(
cmd, values_dict={'input_b': a.id, 'input_c': c.id})
job2 = qdb.processing_job.ProcessingJob.create(
qdb.user.User('[email protected]'), params)
cmd, values_dict={'input_x': a.children[0].id, 'input_y': c.id})
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
self.user, params, name='Test WF')
job1 = list(wf.graph.nodes())[0]

cmd_dp = qdb.software.DefaultParameters.create("", cmd)
wf.add(cmd_dp, req_params={'input_x': a.id, 'input_y': c.id})
job2 = list(wf.graph.nodes())[1]
jobs = [j[1] for e in a.descendants_with_jobs.edges
for j in e if j[0] == 'job']
self.assertIn(job1, jobs)
self.assertIn(job2, jobs)

# 4. add job3 connecting job2 output with c as inputs
# - expected:
# a --> job -> b
# |-> job2 -> out -> job3 -> out
# ^ ^
# | |
# | |
# |-----|---> job1 -> out
# c ------------|
wf.add(cmd_dp, connections={
job1: {'out': 'input_x'}, job2: {'out': 'input_y'}})
job3 = list(wf.graph.nodes())[2]
jobs = [j[1] for e in a.descendants_with_jobs.edges
for j in e if j[0] == 'job']
self.assertIn(job3, jobs)


@qiita_test_checker()
class ArtifactArchiveTests(TestCase):
Expand Down
44 changes: 40 additions & 4 deletions qiita_pet/handlers/software.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def _default_parameters_parsing(node):
# getting the main default parameters
nodes = []
edges = []
at = w.artifact_type

# first get edges as this will give us the main connected commands
# and their order
Expand All @@ -72,18 +73,22 @@ def _default_parameters_parsing(node):
# output_type: output_node_name}, ...}
# for easy look up and merge of output_names
main_nodes = dict()
not_used_nodes = {n.id: n for n in graph.nodes}
for i, (x, y) in enumerate(graph.edges):
if x.id in not_used_nodes:
del not_used_nodes[x.id]
if y.id in not_used_nodes:
del not_used_nodes[y.id]
vals_x, input_x, output_x = _default_parameters_parsing(x)
vals_y, input_y, output_y = _default_parameters_parsing(y)

connections = []
for a, _, c in graph[x][y]['connections'].connections:
connections.append("%s | %s" % (a, c))

vals_x, input_x, output_x = _default_parameters_parsing(x)
vals_y, input_y, output_y = _default_parameters_parsing(y)

if i == 0:
# we are in the first element so we can specifically select
# the type we are looking for
at = w.artifact_type
if at in input_x[0][1]:
input_x[0][1] = at
else:
Expand Down Expand Up @@ -144,6 +149,37 @@ def _default_parameters_parsing(node):

wparams = w.parameters

# adding nodes without edges
# as a first step if not_used_nodes is not empty we'll confirm that
# nodes/edges are empty; in theory we should never hit this
if not_used_nodes and (nodes or edges):
raise ValueError(
'Error, please check your workflow configuration')

# note that this block is similar but not identical to adding connected
# nodes
for i, (_, x) in enumerate(not_used_nodes.items()):
vals_x, input_x, output_x = _default_parameters_parsing(x)
if at in input_x[0][1]:
input_x[0][1] = at
else:
input_x[0][1] = '** WARNING, NOT DEFINED **'
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved

name_x = vals_x[0]
if vals_x not in (nodes):
nodes.append(vals_x)
for a, b in input_x:
if b in inputs:
name = inputs[b]
else:
name = 'input_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name, vals_x[0]])
for a, b in output_x:
name = 'output_%s_%s' % (name_x, b)
nodes.append([name, a, b])
edges.append([name_x, name])

workflows.append(
{'name': w.name, 'id': w.id, 'data_types': w.data_type,
'description': w.description, 'active': w.active,
Expand Down
Loading