Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 725 #731

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions compiler/pash_compilation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(self):

## A map that keeps mappings between proc_id and (input_ir, width, exec_time)
self.process_id_input_ir_map = {}
self.parallelized_flag = False
## This is a map from input IRs, i.e., locations in the code, to a list of process_ids
self.input_ir_to_process_id_map = {}

Expand Down Expand Up @@ -354,6 +355,7 @@ def compile_and_add(self, compiled_script_file, var_file, input_ir_file):
else:
self.running_procs += 1


## Get the time before we start executing (roughly) to determine how much time this command execution will take
command_exec_start_time = datetime.now()
self.process_id_input_ir_map[process_id].set_start_exec_time(
Expand Down Expand Up @@ -420,6 +422,8 @@ def handle_exit(self, input_cmd):
exec_time = (command_finish_exec_time - command_start_exec_time) / timedelta(
milliseconds=1
)
command_start_exec_time = self.process_id_input_ir_map[process_id].get_start_exec_time()
exec_time = (command_finish_exec_time - command_start_exec_time) / timedelta(milliseconds=1)
log("Process:", process_id, "exited. Exec time was:", exec_time)
self.handle_time_measurement(process_id, exec_time)
self.remove_process(process_id)
Expand Down Expand Up @@ -509,6 +513,16 @@ def run(self):
self.parse_and_run_cmd(input_cmd)

self.connection_manager.close()
if not self.parallelized_flag:
log("No parts of the input script were parallelized. Ensure commands are annotated for parallelization.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These messages need to be logged no matter the debug level, so they should be given a different level (the way it is now they will only be printed if we have -d 1. Also it would be good to prefix them with [PaSh Warning]. Here is some wordsmithing to make them a bit clearer too:

  • [PaSh Warning] No region of the script was parallelized. Maybe you are missing relevant annotations? Use -d 1 for more info.
  • [PaSh Warning] Some script regions were parallelized but their execution times were negligible (<1s). If your script takes a long time maybe annotations are missing from relevant regions. Use -d 1 for more info.

elif all(
proc_info.exec_time is not None and proc_info.exec_time < 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we checked that this ever actually passes? I am skeptical about the all proc_info.exec_time is not None. Is there anyway one of those is None and this becomes false?

for proc_info in self.process_id_input_ir_map.values()
):
log("Some script fragments were parallelized, but their execution times were negligible.")
log("Consider optimizing your script to include longer-running tasks.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this message, this is not really an optimization that we are asking them to do, but rather making sure that they have annotations for the long-running parts that they care about.

else:
log("Parallelization completed successfully.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary, should be deleted.

shutdown()


Expand Down
13 changes: 10 additions & 3 deletions compiler/pash_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def main_body():
config.load_config(args.config_path)

runtime_config = config.config["distr_planner"]
config_parallelization_success = False

## Read any shell variables files if present
vars_dict = env_vars_util.read_vars_file(args.var_file, config.BASH_VERSION)
Expand Down Expand Up @@ -94,6 +95,8 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config):
ret = compile_optimize_output_script(
ir_filename, compiled_script_file, args, compiler_config
)
if ret is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this means that there were no parallelization opportunities for the whole script, but only for this region. Also, I think it might be subsumed by the exception handling. Is this code ever called?

log("[PaSh Info] No parallelization opportunities detected for this region.")
except ExpansionError as e:
log("WARNING: Exception caught because some region(s) are not expandable and therefore unparallelizable:", e)
raise NotAllRegionParallelizableError()
Expand All @@ -104,9 +107,9 @@ def compile_ir(ir_filename, compiled_script_file, args, compiler_config):
except (AdjLineNotImplementedError, NotImplementedError) as e:
log("WARNING: Exception caught because some part is not implemented:", e)
log(traceback.format_exc())
except Exception as e:
log("WARNING: Exception caught:", e)
log(traceback.format_exc())
except Exception:
log("[PaSh Warning] Compilation failed due to an unexpected error.", level=0)
log(traceback.format_exc(), level=1)

return ret

Expand Down Expand Up @@ -234,6 +237,8 @@ def optimize_irs(asts_and_irs, args, compiler_config):
if isinstance(ast_or_ir, IR):
## Assert that the graph that was returned from compilation is valid
assert ast_or_ir.valid()
if ast_or_ir.valid():
config_parallelization_success = True

# log(ir_node)
# with cProfile.Profile() as pr:
Expand Down Expand Up @@ -279,6 +284,8 @@ def choose_and_apply_parallelizing_transformations(
graph, fan_out, batch_size, r_split_batch_size
):
parallelizer_map = choose_parallelizing_transformations(graph)
if any(parallelizer_mapp.values()):
config.parallelization_success = True
apply_parallelizing_transformations(
graph, parallelizer_map, fan_out, batch_size, r_split_batch_size
)
Expand Down
Loading
Loading