Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iss632 after rebase on future #721

Merged
merged 7 commits into from
Jun 12, 2024
6 changes: 4 additions & 2 deletions compiler/ast_to_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from util import *
from parse import from_ast_objects_to_shell

from custom_error import *

## TODO: Separate the ir stuff to the bare minimum and
## try to move this to the shell_ast folder.

Expand Down Expand Up @@ -159,7 +161,7 @@ def combine_pipe(ast_nodes):
else:
## If any part of the pipe is not an IR, the compilation must fail.
log("Node: {} is not pure".format(ast_nodes[0]))
raise Exception("Not pure node in pipe")
raise UnparallelizableError("Node: {} is not a pure node in pipe".format(ast_nodes[0]))
angelhof marked this conversation as resolved.
Show resolved Hide resolved

## Combine the rest of the nodes
for ast_node in ast_nodes[1:]:
Expand All @@ -168,7 +170,7 @@ def combine_pipe(ast_nodes):
else:
## If any part of the pipe is not an IR, the compilation must fail.
log("Node: {} is not pure".format(ast_nodes))
raise Exception("Not pure node in pipe")
raise UnparallelizableError("This specific node: {} is not a pure node in pipe".format(ast_node))
angelhof marked this conversation as resolved.
Show resolved Hide resolved

return [combined_nodes]

Expand Down
5 changes: 5 additions & 0 deletions compiler/custom_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class UnparallelizableError(Exception):
pass

class AdjLineNotImplementedError(Exception):
pass
15 changes: 8 additions & 7 deletions compiler/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from shell_ast.ast_util import *
from util import *
from custom_error import *

import config

Expand Down Expand Up @@ -242,11 +243,11 @@ def compile_command_to_DFG(fileIdGen, command, options, redirections=None):
command_invocation
)
if io_info is None:
raise Exception(
raise UnparallelizableError(
f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful."
)
if io_info.has_other_outputs():
raise Exception(
raise UnparallelizableError(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Konstantinos (from old branch): Maybe also print what is the other output?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I may need some help with determining what the other outputs to pass it along in the exception. I see that in io_info.has_other_outputs(), it is going through a for-loop checking if each entry is a "other output" has_other_outputs() function (line 182); I'm not sure if there's already a function in the class that returns these "other outputs", but if not, would it be reasonable to change this function or add another function like this to return a list of those outputs in the class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if there is such a function, but if not, you could certainly make a new one that returns a list of those outputs!

f"Command {format_arg_chars(command)} has outputs other than streaming."
)
para_info: ParallelizabilityInfo = (
Expand Down Expand Up @@ -840,7 +841,7 @@ def apply_parallelization_to_node(
node_id, parallelizer, fileIdGen, fan_out
)
else:
raise Exception("Splitter not yet implemented")
raise UnparallelizableError("Splitter not yet implemented for command: {}".format(self.get_node(node_id=node_id).cmd_invocation_with_io_vars.cmd_name))
angelhof marked this conversation as resolved.
Show resolved Hide resolved

def apply_round_robin_parallelization_to_node(
self, node_id, parallelizer, fileIdGen, fan_out, r_split_batch_size
Expand All @@ -849,11 +850,11 @@ def apply_round_robin_parallelization_to_node(
# currently, this cannot be done since splitter etc. would be added...
aggregator_spec = parallelizer.get_aggregator_spec()
if aggregator_spec.is_aggregator_spec_adj_lines_merge():
raise Exception("adj_lines_merge not yet implemented in PaSh")
angelhof marked this conversation as resolved.
Show resolved Hide resolved
raise AdjLineNotImplementedError("adj_lines_merge not yet implemented in PaSh")
elif aggregator_spec.is_aggregator_spec_adj_lines_seq():
raise Exception("adj_lines_seq not yet implemented in PaSh")
raise AdjLineNotImplementedError("adj_lines_seq not yet implemented in PaSh")
elif aggregator_spec.is_aggregator_spec_adj_lines_func():
raise Exception("adj_lines_func not yet implemented in PaSh")
raise AdjLineNotImplementedError("adj_lines_func not yet implemented in PaSh")
# END of what to move

node = self.get_node(node_id)
Expand Down Expand Up @@ -1192,7 +1193,7 @@ def introduce_aggregators_for_consec_chunks(
fileIdGen,
)
else:
raise Exception("aggregator kind not yet implemented")
angelhof marked this conversation as resolved.
Show resolved Hide resolved
raise UnparallelizableError("aggregator kind not yet implemented for command: {}".format(original_cmd_invocation_with_io_vars.cmd_name))
else: # we got auxiliary information
assert parallelizer.core_aggregator_spec.is_aggregator_spec_custom_2_ary()
map_in_aggregator_ids = in_aggregator_ids
Expand Down
14 changes: 12 additions & 2 deletions compiler/pash_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from datetime import datetime

from sh_expand import env_vars_util
from sh_expand.expand import ExpansionError

import config
from ir import *
from ast_to_ir import compile_asts
from ir_to_ast import to_shell
from pash_graphviz import maybe_generate_graphviz
from util import *
from custom_error import *

from definitions.ir.aggregator_node import *

Expand Down Expand Up @@ -92,9 +94,17 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config):
ret = compile_optimize_output_script(
ir_filename, compiled_script_file, args, compiler_config
)
except ExpansionError as e:
log("WARNING: Exception caught because some region(s) are not expandable and therefore unparallelizable:", e)
except UnparallelizableError as e:
log("WARNING: Exception caught because some region(s) are unparallelizable:", e)
# log(traceback.format_exc()) # uncomment for exact trace report (PaSh user should see informative messages for unparellizable regions)
angelhof marked this conversation as resolved.
Show resolved Hide resolved
except (AdjLineNotImplementedError, NotImplementedError) as e:
log("WARNING: Exception caught because some part is not implemented:", e)
log(traceback.format_exc())
except Exception as e:
log("WARNING: Exception caught:", e)
# traceback.print_exc()
log(traceback.format_exc())
angelhof marked this conversation as resolved.
Show resolved Hide resolved

return ret

Expand Down Expand Up @@ -142,7 +152,7 @@ def compile_optimize_output_script(

ret = optimized_ast_or_ir
else:
raise Exception("Script failed to compile!")
raise UnparallelizableError("Script failed to compile!")

return ret

Expand Down
Loading