Skip to content

Commit

Permalink
issue #1171 - compress downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
davmlaw authored and TheMadBug committed Sep 19, 2024
1 parent 5414bd1 commit 6aede1c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 29 deletions.
68 changes: 40 additions & 28 deletions analysis/tasks/analysis_grid_export_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import gzip
import logging
import os
import uuid
import zipfile
from typing import Optional

import celery
Expand Down Expand Up @@ -39,41 +41,60 @@ def get_grid_downloadable_file_params_hash(pk, export_type):
return sha256sum_str(f"{pk}-{export_type}")


def _write_cached_generated_file(cgf: CachedGeneratedFile, total_records, filename, file_iterator):
def update_cgf_progress_iterator(iterator, cgf_id, total_records, update_size):
update_size = int(update_size) # make sure int so modulus below will hit
cgf_qs = CachedGeneratedFile.objects.filter(id=cgf_id)
cgf_qs.update(progress=0)

for i, record in enumerate(iterator):
if i % update_size == 0:
progress = i / total_records
cgf_qs.update(progress=progress)
yield record
cgf_qs.update(progress=1, task_status='SUCCESS')


def _write_node_to_cached_generated_file(cgf, analysis, node, name, export_type):
basename = "_".join([name_from_filename(name), "annotated", f"v{analysis.annotation_version.pk}",
str(analysis.genome_build)])
request = FakeRequest(user=admin_bot())
basename, file_iterator = node_grid_get_export_iterator(request, node, export_type, basename=basename)
open_func = open
if export_type == 'vcf':
open_func = gzip.open
basename += ".gz"

total_records = node.count
update_size = max(1000, total_records / 100) # 1% or every 1k records
update_progress_iterator = update_cgf_progress_iterator(file_iterator(), cgf.pk, total_records, update_size)

logging.info("Starting to write %s", filename)
media_root_filename = os.path.join(settings.MEDIA_ROOT, str(uuid.uuid4()), filename)
media_root_filename = os.path.join(settings.GENERATED_DIR, cgf.generator, str(cgf.pk), basename)
logging.info("Starting to write %s", media_root_filename)
try:
mk_path_for_file(media_root_filename)
with open(media_root_filename, "w") as f:
with open_func(media_root_filename, "wt") as f:
for line in update_progress_iterator:
f.write(line) # Already has newline

if export_type == 'csv':
original_filename = media_root_filename
zip_file_path = media_root_filename + ".zip"
with zipfile.ZipFile(zip_file_path, 'w') as zipf:
zipf.write(original_filename, arcname=os.path.basename(original_filename))
os.unlink(original_filename)
media_root_filename = zip_file_path
cgf.filename = media_root_filename
cgf.task_status = "SUCCESS"
cgf.generate_end = timezone.now()
logging.info("Wrote %s", media_root_filename)
# Write CSVs to Zip (requires the file to be there already)
except Exception as e:
logging.error("Failed to write %s: %s", media_root_filename, e)
cgf.exception = str(e)
cgf.task_status = "FAILURE"
cgf.save()


def update_cgf_progress_iterator(iterator, cgf_id, total_records, update_size):
update_size = int(update_size) # make sure int so modulus below will hit
cgf_qs = CachedGeneratedFile.objects.filter(id=cgf_id)
cgf_qs.update(progress=0)

for i, record in enumerate(iterator):
if i % update_size == 0:
progress = i / total_records
cgf_qs.update(progress=progress)
yield record
cgf_qs.update(progress=1, task_status='SUCCESS')


@celery.shared_task
def export_cohort_to_downloadable_file(cohort_id, export_type):
# This should have been created in analysis.views.views_grid.cohort_grid_export
Expand All @@ -85,12 +106,7 @@ def export_cohort_to_downloadable_file(cohort_id, export_type):
analysis_template = AnalysisTemplate.get_template_from_setting("ANALYSIS_TEMPLATES_AUTO_COHORT_EXPORT")
analysis = get_cohort_analysis(cohort, analysis_template)
node = analysis.analysisnode_set.get_subclass(output_node=True) # Should only be 1
basename = "_".join([name_from_filename(cohort.name), "annotated", f"v{analysis.annotation_version.pk}",
str(cohort.genome_build)])

request = FakeRequest(user=admin_bot())
filename, file_iterator = node_grid_get_export_iterator(request, node, export_type, basename=basename)
_write_cached_generated_file(cgf, node.count, filename, file_iterator)
_write_node_to_cached_generated_file(cgf, analysis, node, cohort.name, export_type)


@celery.shared_task
Expand All @@ -104,8 +120,4 @@ def export_sample_to_downloadable_file(sample_id, export_type):
analysis_template = AnalysisTemplate.get_template_from_setting("ANALYSIS_TEMPLATES_AUTO_SAMPLE")
analysis = get_sample_analysis(sample, analysis_template)
node = SampleNode.objects.get(analysis=analysis, output_node=True) # Should only be 1
basename = "_".join([name_from_filename(sample.name), "annotated", f"v{analysis.annotation_version.pk}",
str(sample.genome_build)])
request = FakeRequest(user=admin_bot())
filename, file_iterator = node_grid_get_export_iterator(request, node, export_type, basename=basename)
_write_cached_generated_file(cgf, node.count, filename, file_iterator)
_write_node_to_cached_generated_file(cgf, analysis, node, sample.name, export_type)
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ class AnnotatedFileDownload {
spinner.append(progressIndicator);

function updateProgress(progress) {
console.log("updateProgress");
let percent = Math.floor(100 * progress);
progressIndicator.empty();
progressIndicator.append(`${percent}% complete`);
Expand Down

0 comments on commit 6aede1c

Please sign in to comment.