diff --git a/compiler/ast_to_ir.py b/compiler/ast_to_ir.py index 8d6f755a4..c1e753fa3 100644 --- a/compiler/ast_to_ir.py +++ b/compiler/ast_to_ir.py @@ -8,6 +8,8 @@ from util import * from parse import from_ast_objects_to_shell +from custom_error import * + ## TODO: Separate the ir stuff to the bare minimum and ## try to move this to the shell_ast folder. @@ -159,7 +161,7 @@ def combine_pipe(ast_nodes): else: ## If any part of the pipe is not an IR, the compilation must fail. log("Node: {} is not pure".format(ast_nodes[0])) - raise Exception("Not pure node in pipe") + raise UnparallelizableError("Node: {} is not a pure node in pipe".format(ast_nodes[0])) ## Combine the rest of the nodes for ast_node in ast_nodes[1:]: @@ -168,7 +170,7 @@ def combine_pipe(ast_nodes): else: ## If any part of the pipe is not an IR, the compilation must fail. log("Node: {} is not pure".format(ast_nodes)) - raise Exception("Not pure node in pipe") + raise UnparallelizableError("This specific node: {} is not a pure node in pipe".format(ast_node)) return [combined_nodes] diff --git a/compiler/custom_error.py b/compiler/custom_error.py new file mode 100644 index 000000000..eedb6f738 --- /dev/null +++ b/compiler/custom_error.py @@ -0,0 +1,5 @@ +class UnparallelizableError(Exception): + pass + +class AdjLineNotImplementedError(Exception): + pass \ No newline at end of file diff --git a/compiler/ir.py b/compiler/ir.py index c1534494a..b7319cc23 100644 --- a/compiler/ir.py +++ b/compiler/ir.py @@ -38,6 +38,7 @@ from shell_ast.ast_util import * from util import * +from custom_error import * import config @@ -242,11 +243,11 @@ def compile_command_to_DFG(fileIdGen, command, options, redirections=None): command_invocation ) if io_info is None: - raise Exception( + raise UnparallelizableError( f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful." ) if io_info.has_other_outputs(): - raise Exception( + raise UnparallelizableError( f"Command {format_arg_chars(command)} has outputs other than streaming." ) para_info: ParallelizabilityInfo = ( @@ -840,7 +841,7 @@ def apply_parallelization_to_node( node_id, parallelizer, fileIdGen, fan_out ) else: - raise Exception("Splitter not yet implemented") + raise UnparallelizableError("Splitter not yet implemented for command: {}".format(self.get_node(node_id=node_id).cmd_invocation_with_io_vars.cmd_name)) def apply_round_robin_parallelization_to_node( self, node_id, parallelizer, fileIdGen, fan_out, r_split_batch_size @@ -849,11 +850,11 @@ def apply_round_robin_parallelization_to_node( # currently, this cannot be done since splitter etc. would be added... aggregator_spec = parallelizer.get_aggregator_spec() if aggregator_spec.is_aggregator_spec_adj_lines_merge(): - raise Exception("adj_lines_merge not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_merge not yet implemented in PaSh") elif aggregator_spec.is_aggregator_spec_adj_lines_seq(): - raise Exception("adj_lines_seq not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_seq not yet implemented in PaSh") elif aggregator_spec.is_aggregator_spec_adj_lines_func(): - raise Exception("adj_lines_func not yet implemented in PaSh") + raise AdjLineNotImplementedError("adj_lines_func not yet implemented in PaSh") # END of what to move node = self.get_node(node_id) @@ -1192,7 +1193,7 @@ def introduce_aggregators_for_consec_chunks( fileIdGen, ) else: - raise Exception("aggregator kind not yet implemented") + raise UnparallelizableError("aggregator kind not yet implemented for command: {}".format(original_cmd_invocation_with_io_vars.cmd_name)) else: # we got auxiliary information assert parallelizer.core_aggregator_spec.is_aggregator_spec_custom_2_ary() map_in_aggregator_ids = in_aggregator_ids diff --git a/compiler/pash_compiler.py b/compiler/pash_compiler.py index c4fc7282e..a949457ba 100644 --- a/compiler/pash_compiler.py +++ b/compiler/pash_compiler.py @@ -4,6 +4,7 @@ from datetime import datetime from sh_expand import env_vars_util +from sh_expand.expand import ExpansionError import config from ir import * @@ -11,6 +12,7 @@ from ir_to_ast import to_shell from pash_graphviz import maybe_generate_graphviz from util import * +from custom_error import * from definitions.ir.aggregator_node import * @@ -92,9 +94,17 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config): ret = compile_optimize_output_script( ir_filename, compiled_script_file, args, compiler_config ) + except ExpansionError as e: + log("WARNING: Exception caught because some region(s) are not expandable and therefore unparallelizable:", e) + except UnparallelizableError as e: + log("WARNING: Exception caught because some region(s) are unparallelizable:", e) + # log(traceback.format_exc()) # uncomment for exact trace report (PaSh user should see informative messages for unparellizable regions) + except (AdjLineNotImplementedError, NotImplementedError) as e: + log("WARNING: Exception caught because some part is not implemented:", e) + log(traceback.format_exc()) except Exception as e: log("WARNING: Exception caught:", e) - # traceback.print_exc() + log(traceback.format_exc()) return ret @@ -142,7 +152,7 @@ def compile_optimize_output_script( ret = optimized_ast_or_ir else: - raise Exception("Script failed to compile!") + raise UnparallelizableError("Script failed to compile!") return ret