diff --git a/docs/operators/index.md b/docs/operators/index.md index cefe3842..ac11cac8 100644 --- a/docs/operators/index.md +++ b/docs/operators/index.md @@ -86,6 +86,20 @@ jupyter lab --SchedulerApp.job_files_manager_class=jupyter_scheduler.job_files_m For more information on writing a custom implementation, please see the {doc}`developer's guide `. +### Example: Capturing side effect files + +The default scheduler and execution manager classes do not capture +**side effect files**, files that are created as a side effect of executing +cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager` +classes do capture side effect files. If you intend to run notebooks that produce +side effect files, you can use these classes by running: + +``` +jupyter lab \ + --SchedulerApp.scheduler_class=jupyter_scheduler.scheduler.ArchivingScheduler \ + --Scheduler.execution_manager_class=jupyter_scheduler.executors.ArchivingExecutionManager +``` + ## UI configuration You can configure the Jupyter Scheduler UI by installing a lab extension that both: diff --git a/jupyter_scheduler/executors.py b/jupyter_scheduler/executors.py index 34befd4f..47b50249 100644 --- a/jupyter_scheduler/executors.py +++ b/jupyter_scheduler/executors.py @@ -1,4 +1,6 @@ import io +import os +import shutil import tarfile import traceback from abc import ABC, abstractmethod @@ -174,12 +176,12 @@ def validate(cls, input_path: str) -> bool: class ArchivingExecutionManager(DefaultExecutionManager): - """Execution manager that archives the output - files to a compressed tar file. + """Execution manager that archives all output files in and under the + output directory into a single archive file Notes ----- - Should be used along with :class:`~jupyter_scheduler.scheduler.ArchiveDownloadingScheduler` + Should be used along with :class:`~jupyter_scheduler.scheduler.ArchivingScheduler` as the `scheduler_class` during jupyter server start. """ @@ -197,27 +199,41 @@ def execute(self): store_widget_state=True, ) + # Get the directory of the input file + local_staging_dir = os.path.dirname(self.staging_paths["input"]) + # Directory where side-effect files are written + run_dir = os.path.join(local_staging_dir, "files") + os.mkdir(run_dir) + try: - ep.preprocess(nb) + ep.preprocess(nb, {"metadata": {"path": run_dir}}) except CellExecutionError as e: pass finally: + # Create all desired output files, other than "input" and "tar.gz" + for output_format in job.output_formats: + if output_format == "input" or output_format == "tar.gz": + pass + else: + cls = nbconvert.get_exporter(output_format) + output, resources = cls().from_notebook_node(nb) + f = open(self.staging_paths[output_format], "wb") + f.write(bytes(output, "utf-8")) + f.close() + + # Create an archive file of the staging directory for this run + # and everything under it fh = io.BytesIO() with tarfile.open(fileobj=fh, mode="w:gz") as tar: - output_formats = job.output_formats + ["input"] - for output_format in output_formats: - if output_format == "input": - with open(self.staging_paths["input"]) as f: - output = f.read() - else: - cls = nbconvert.get_exporter(output_format) - output, resources = cls().from_notebook_node(nb) - data = bytes(output, "utf-8") - source_f = io.BytesIO(initial_bytes=data) - info = tarfile.TarInfo(self.staging_paths[output_format]) - info.size = len(data) - tar.addfile(info, source_f) + for root, dirs, files in os.walk(local_staging_dir): + for file in files: + # This flattens the directory structure, so that in the tar + # file, output files and side-effect files are side-by-side + tar.add(os.path.join(root, file), file) archive_filepath = self.staging_paths["tar.gz"] with fsspec.open(archive_filepath, "wb") as f: f.write(fh.getvalue()) + + # Clean up the side-effect files in the run directory + shutil.rmtree(run_dir) diff --git a/jupyter_scheduler/job_files_manager.py b/jupyter_scheduler/job_files_manager.py index b4f0928f..203092f8 100644 --- a/jupyter_scheduler/job_files_manager.py +++ b/jupyter_scheduler/job_files_manager.py @@ -68,16 +68,10 @@ def generate_filepaths(self): def download_tar(self, archive_format: str = "tar"): archive_filepath = self.staging_paths[archive_format] read_mode = "r:gz" if archive_format == "tar.gz" else "tar" + with fsspec.open(archive_filepath) as f: with tarfile.open(fileobj=f, mode=read_mode) as tar: - filepaths = self.generate_filepaths() - for input_filepath, output_filepath in filepaths: - try: - input_file = tar.extractfile(member=input_filepath) - with fsspec.open(output_filepath, mode="wb") as output_file: - output_file.write(input_file.read()) - except Exception as e: - pass + tar.extractall(self.output_dir, filter="data") def download(self): if not self.staging_paths: diff --git a/jupyter_scheduler/scheduler.py b/jupyter_scheduler/scheduler.py index 18a318c2..ff08b681 100644 --- a/jupyter_scheduler/scheduler.py +++ b/jupyter_scheduler/scheduler.py @@ -686,7 +686,7 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) -> class ArchivingScheduler(Scheduler): - """Scheduler that adds archive path to staging paths.""" + """Scheduler that captures all files in output directory in an archive.""" execution_manager_class = TType( klass="jupyter_scheduler.executors.ExecutionManager", @@ -705,12 +705,16 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) -> filename = create_output_filename( model.input_filename, model.create_time, output_format ) - staging_paths[output_format] = filename + # Use the staging directory to capture output files + staging_paths[output_format] = os.path.join(self.staging_path, id, filename) - output_format = "tar.gz" - filename = create_output_filename(model.input_filename, model.create_time, output_format) - staging_paths[output_format] = os.path.join(self.staging_path, model.job_id, filename) - staging_paths["input"] = os.path.join(self.staging_path, model.job_id, model.input_filename) + # Create an output archive file + staging_paths["tar.gz"] = os.path.join( + self.staging_path, + id, + create_output_filename(model.input_filename, model.create_time, "tar.gz"), + ) + staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename) return staging_paths