Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF: rechunk on concat #14

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions micov/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _parse_bed_cov(data, feature_drop, feature_keep, lazy):

frame = pl.read_csv(data.read(), separator='\t',
new_columns=BED_COV_SCHEMA.columns,
dtypes=BED_COV_SCHEMA.dtypes_dict,
schema_overrides=BED_COV_SCHEMA.dtypes_dict,
has_header=False, skip_rows=skip_rows).lazy()

if feature_drop is not None:
Expand Down Expand Up @@ -130,7 +130,7 @@ def _parse_qiita_coverages(tgz, compress_size=50_000_000, sample_keep=None,

def _single_df(coverages):
if len(coverages) > 1:
df = pl.concat(coverages)
df = pl.concat(coverages, rechunk=True)
elif len(coverages) == 0:
raise ValueError("No coverages")
else:
Expand Down Expand Up @@ -315,13 +315,13 @@ def compress_from_stream(sam, bufsize=100_000_000, disable_compression=False):
current_df = compress_f(pl.concat([current_df, next_df]))
buf = data.readlines(bufsize)

return current_df
return current_df.rechunk()


def parse_coverage(data, features_to_keep):
cov_df = pl.read_csv(data.read(), separator='\t',
new_columns=GENOME_COVERAGE_SCHEMA.columns,
dtypes=GENOME_COVERAGE_SCHEMA.dtypes_dict).lazy()
schema_overrides=GENOME_COVERAGE_SCHEMA.dtypes_dict).lazy()

if features_to_keep is not None:
cov_df = cov_df.filter(pl.col(COLUMN_GENOME_ID).is_in(feature_keep))
Expand Down
2 changes: 1 addition & 1 deletion micov/_per_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ def compress_per_sample(coverage, lengths):
if len(sample_contig_coverage) == 0:
return None
else:
return pl.concat(sample_contig_coverage)
return pl.concat(sample_contig_coverage, rechunk=True)
5 changes: 5 additions & 0 deletions micov/tests/test_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_compress(self):
['G123', 101, 110],
['G456', 200, 300],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
data = pl.DataFrame([['G123', 11, 50],
['G123', 20, 30],
Expand All @@ -25,6 +26,7 @@ def test_compress(self):
['G123', 51, 89],
['G123', 101, 110],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress(data).sort(COLUMN_GENOME_ID)
plt.assert_frame_equal(obs, exp)
Expand All @@ -34,16 +36,19 @@ def test_coverage_percent(self):
['G456', 200, 299],
['G123', 90, 100],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
lengths = pl.DataFrame([['G123', 100],
['G456', 1000],
['G789', 500]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)

g123_covered = (50 - 11) + (100 - 90)
g456_covered = (299 - 200) + (500 - 400)
exp = pl.DataFrame([['G123', g123_covered, 100, (g123_covered / 100) * 100],
['G456', g456_covered, 1000, (g456_covered / 1000) * 100]],
orient='row',
schema=GENOME_COVERAGE_SCHEMA.dtypes_flat)

obs = coverage_percent(data, lengths).sort(COLUMN_GENOME_ID).collect()
Expand Down
16 changes: 16 additions & 0 deletions micov/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_write_qiita_cov(self):
lengths = pl.DataFrame([['GXXX', 600],
['GYYY', 1100],
['GZZZ', 2000]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)

write_qiita_cov(self.name, paths, lengths)
Expand All @@ -101,11 +102,13 @@ def test_write_qiita_cov(self):
['GYYY', 100, 400],
['GYYY', 500, 1000],
['GZZZ', 200, 400]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)

exp_cov_percent = pl.DataFrame([['GXXX', 400, 600, (400 / 600) * 100],
['GYYY', 800, 1100, (800 / 1100) * 100],
['GZZZ', 200, 2000, (200 / 2000) * 100]],
orient='row',
schema=GENOME_COVERAGE_SCHEMA.dtypes_flat)

obs_artifact_cov = obs_artifact_cov.sort([COLUMN_GENOME_ID, COLUMN_START])
Expand All @@ -124,6 +127,7 @@ def test_parse_qiita_coverages(self):
['G123', 1000, 10000],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
# always compress
obs = parse_qiita_coverages(self.name)
Expand All @@ -137,6 +141,7 @@ def test_parse_qiita_coverages_always_compress(self):
['G123', 1000, 10000],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
# always compress
obs = parse_qiita_coverages(self.name, compress_size=0)
Expand All @@ -152,6 +157,7 @@ def test_parse_qiita_coverages_never_compress(self):
['G456', 5, 20],
['G789', 1, 100],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name, compress_size=None)
obs = obs.sort([COLUMN_GENOME_ID, COLUMN_START])
Expand All @@ -163,6 +169,7 @@ def test_parse_qiita_coverages_keep(self):
['G123', 300, 400],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_keep={'sample_a', 'sample_b'})
Expand All @@ -175,6 +182,7 @@ def test_parse_qiita_coverages_drop(self):
['G123', 300, 400],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', })
Expand All @@ -186,6 +194,7 @@ def test_parse_qiita_coverages_keep_drop(self):
['G123', 100, 200],
['G456', 5, 20],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', },
Expand All @@ -198,6 +207,7 @@ def test_parse_qiita_coverages_keep_feature(self):
['G123', 100, 200],
['G123', 300, 400],
['G123', 1000, 10000]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
feature_keep={'G123', })
Expand All @@ -207,6 +217,7 @@ def test_parse_qiita_coverages_keep_feature(self):
def test_parse_qiita_coverages_drop_feature(self):
exp = pl.DataFrame([['G456', 5, 20],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', },
Expand Down Expand Up @@ -236,6 +247,7 @@ def test_parse_genome_lengths_good(self):
exp = pl.DataFrame([['a', 10],
['b', 20],
['c', 30]],
orient='row',
schema=[COLUMN_GENOME_ID, COLUMN_LENGTH])
obs = parse_genome_lengths(self.name)
plt.assert_frame_equal(obs, exp)
Expand All @@ -251,6 +263,7 @@ def test_parse_genome_lengths_noheader(self):
exp = pl.DataFrame([['a', 10],
['b', 20],
['c', 30]],
orient='row',
schema=[COLUMN_GENOME_ID, COLUMN_LENGTH])
obs = parse_genome_lengths(self.name)
plt.assert_frame_equal(obs, exp)
Expand Down Expand Up @@ -303,6 +316,7 @@ def test_compress_from_stream(self):
['X', 90, 150],
['Y', 10, 60],
['Y', 100, 150]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress_from_stream(data, bufsize=2)
plt.assert_frame_equal(obs.sort([COLUMN_GENOME_ID, ]), exp)
Expand All @@ -323,6 +337,7 @@ def test_compress_from_stream_disable_compression(self):
['X', 100, 150],
['Y', 10, 60],
['Y', 100, 150]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress_from_stream(data, bufsize=2, disable_compression=True)
plt.assert_frame_equal(obs.sort([COLUMN_GENOME_ID, COLUMN_START]), exp)
Expand All @@ -339,6 +354,7 @@ def test_parse_sam_to_df(self):
exp = pl.DataFrame([['A', 0, 'X', 1, '50M', 51],
['B', 0, 'Y', 10, '50M', 60],
['C', 0, 'X', 100, '50M', 150]],
orient='row',
schema=SAM_SUBSET_SCHEMA_PARSED.dtypes_flat)
obs = parse_sam_to_df(data)
plt.assert_frame_equal(obs, exp)
Expand Down
3 changes: 3 additions & 0 deletions micov/tests/test_per_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Tests(unittest.TestCase):
def test_compress_per_sample(self):
lengths = pl.DataFrame([['A', 400],
['B', 500]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)
df = pl.DataFrame([['A', 10, 100, 'S1'],
['A', 10, 20, 'S1'],
Expand All @@ -22,13 +23,15 @@ def test_compress_per_sample(self):
['A', 50, 150, 'S2'],
['A', 200, 300, 'S1'],
['A', 201, 299, 'S1']],
orient='row',
schema=BED_COV_SAMPLEID_SCHEMA.dtypes_flat)
s1_a = ((110 - 10) + (300 - 200))
s1_b = ((150 - 50) + (250 - 200))
s2_a = (150 - 50)
exp = pl.DataFrame([['A', s1_a, 400, (s1_a / 400) * 100, 'S1'],
['A', s2_a, 400, (s2_a / 400) * 100, 'S2'],
['B', s1_b, 500, (s1_b / 500) * 100, 'S1']],
orient='row',
schema=GENOME_COVERAGE_WITH_SAMPLEID_SCHEMA.dtypes_flat)
obs = compress_per_sample(df, lengths).sort([COLUMN_GENOME_ID,
COLUMN_SAMPLE_ID]).collect()
Expand Down
Loading