Skip to content

Commit

Permalink
Archiving all-files scheduler (jupyter-server#388)
Browse files Browse the repository at this point in the history
* Fix typo in comment

* WIP: Adds new scheduler

* writes individual files

* WIP: Write zip file

* WIP: Trying to get zip file to be written only on scheduled job runs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* WIP: Removes zip type, incremental work for archiving work dir

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Create tar.gz in staging subdir

* Capture side effect files in staging dir

* Extracts files

* Add filter

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update jupyter_scheduler/job_files_manager.py

Co-authored-by: david qiu <[email protected]>

* Simplifies cleanup logic

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Updates docs, deletes old Archiving*, renames AllFilesArchiving

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: david qiu <[email protected]>
  • Loading branch information
3 people authored Aug 11, 2023
1 parent 1e6f460 commit b897aa5
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 31 deletions.
14 changes: 14 additions & 0 deletions docs/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ jupyter lab --SchedulerApp.job_files_manager_class=jupyter_scheduler.job_files_m

For more information on writing a custom implementation, please see the {doc}`developer's guide </developers/index>`.

### Example: Capturing side effect files

The default scheduler and execution manager classes do not capture
**side effect files**, files that are created as a side effect of executing
cells in a notebook. The `ArchivingScheduler` and `ArchivingExecutionManager`
classes do capture side effect files. If you intend to run notebooks that produce
side effect files, you can use these classes by running:

```
jupyter lab \
--SchedulerApp.scheduler_class=jupyter_scheduler.scheduler.ArchivingScheduler \
--Scheduler.execution_manager_class=jupyter_scheduler.executors.ArchivingExecutionManager
```

## UI configuration

You can configure the Jupyter Scheduler UI by installing a lab extension that both:
Expand Down
50 changes: 33 additions & 17 deletions jupyter_scheduler/executors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import io
import os
import shutil
import tarfile
import traceback
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -174,12 +176,12 @@ def validate(cls, input_path: str) -> bool:


class ArchivingExecutionManager(DefaultExecutionManager):
"""Execution manager that archives the output
files to a compressed tar file.
"""Execution manager that archives all output files in and under the
output directory into a single archive file
Notes
-----
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchiveDownloadingScheduler`
Should be used along with :class:`~jupyter_scheduler.scheduler.ArchivingScheduler`
as the `scheduler_class` during jupyter server start.
"""

Expand All @@ -197,27 +199,41 @@ def execute(self):
store_widget_state=True,
)

# Get the directory of the input file
local_staging_dir = os.path.dirname(self.staging_paths["input"])
# Directory where side-effect files are written
run_dir = os.path.join(local_staging_dir, "files")
os.mkdir(run_dir)

try:
ep.preprocess(nb)
ep.preprocess(nb, {"metadata": {"path": run_dir}})
except CellExecutionError as e:
pass
finally:
# Create all desired output files, other than "input" and "tar.gz"
for output_format in job.output_formats:
if output_format == "input" or output_format == "tar.gz":
pass
else:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
f = open(self.staging_paths[output_format], "wb")
f.write(bytes(output, "utf-8"))
f.close()

# Create an archive file of the staging directory for this run
# and everything under it
fh = io.BytesIO()
with tarfile.open(fileobj=fh, mode="w:gz") as tar:
output_formats = job.output_formats + ["input"]
for output_format in output_formats:
if output_format == "input":
with open(self.staging_paths["input"]) as f:
output = f.read()
else:
cls = nbconvert.get_exporter(output_format)
output, resources = cls().from_notebook_node(nb)
data = bytes(output, "utf-8")
source_f = io.BytesIO(initial_bytes=data)
info = tarfile.TarInfo(self.staging_paths[output_format])
info.size = len(data)
tar.addfile(info, source_f)
for root, dirs, files in os.walk(local_staging_dir):
for file in files:
# This flattens the directory structure, so that in the tar
# file, output files and side-effect files are side-by-side
tar.add(os.path.join(root, file), file)

archive_filepath = self.staging_paths["tar.gz"]
with fsspec.open(archive_filepath, "wb") as f:
f.write(fh.getvalue())

# Clean up the side-effect files in the run directory
shutil.rmtree(run_dir)
10 changes: 2 additions & 8 deletions jupyter_scheduler/job_files_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,10 @@ def generate_filepaths(self):
def download_tar(self, archive_format: str = "tar"):
archive_filepath = self.staging_paths[archive_format]
read_mode = "r:gz" if archive_format == "tar.gz" else "tar"

with fsspec.open(archive_filepath) as f:
with tarfile.open(fileobj=f, mode=read_mode) as tar:
filepaths = self.generate_filepaths()
for input_filepath, output_filepath in filepaths:
try:
input_file = tar.extractfile(member=input_filepath)
with fsspec.open(output_filepath, mode="wb") as output_file:
output_file.write(input_file.read())
except Exception as e:
pass
tar.extractall(self.output_dir, filter="data")

def download(self):
if not self.staging_paths:
Expand Down
16 changes: 10 additions & 6 deletions jupyter_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->


class ArchivingScheduler(Scheduler):
"""Scheduler that adds archive path to staging paths."""
"""Scheduler that captures all files in output directory in an archive."""

execution_manager_class = TType(
klass="jupyter_scheduler.executors.ExecutionManager",
Expand All @@ -705,12 +705,16 @@ def get_staging_paths(self, model: Union[DescribeJob, DescribeJobDefinition]) ->
filename = create_output_filename(
model.input_filename, model.create_time, output_format
)
staging_paths[output_format] = filename
# Use the staging directory to capture output files
staging_paths[output_format] = os.path.join(self.staging_path, id, filename)

output_format = "tar.gz"
filename = create_output_filename(model.input_filename, model.create_time, output_format)
staging_paths[output_format] = os.path.join(self.staging_path, model.job_id, filename)
staging_paths["input"] = os.path.join(self.staging_path, model.job_id, model.input_filename)
# Create an output archive file
staging_paths["tar.gz"] = os.path.join(
self.staging_path,
id,
create_output_filename(model.input_filename, model.create_time, "tar.gz"),
)
staging_paths["input"] = os.path.join(self.staging_path, id, model.input_filename)

return staging_paths

Expand Down

0 comments on commit b897aa5

Please sign in to comment.