Skip to content

Commit

Permalink
Merge pull request #14 from biocore/rechunk_on_concat
Browse files Browse the repository at this point in the history
PERF: rechunk on concat
  • Loading branch information
wasade authored Jul 9, 2024
2 parents 02ae1a5 + 3dc3de6 commit 02366fd
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
8 changes: 4 additions & 4 deletions micov/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _parse_bed_cov(data, feature_drop, feature_keep, lazy):

frame = pl.read_csv(data.read(), separator='\t',
new_columns=BED_COV_SCHEMA.columns,
dtypes=BED_COV_SCHEMA.dtypes_dict,
schema_overrides=BED_COV_SCHEMA.dtypes_dict,
has_header=False, skip_rows=skip_rows).lazy()

if feature_drop is not None:
Expand Down Expand Up @@ -130,7 +130,7 @@ def _parse_qiita_coverages(tgz, compress_size=50_000_000, sample_keep=None,

def _single_df(coverages):
if len(coverages) > 1:
df = pl.concat(coverages)
df = pl.concat(coverages, rechunk=True)
elif len(coverages) == 0:
raise ValueError("No coverages")
else:
Expand Down Expand Up @@ -315,13 +315,13 @@ def compress_from_stream(sam, bufsize=100_000_000, disable_compression=False):
current_df = compress_f(pl.concat([current_df, next_df]))
buf = data.readlines(bufsize)

return current_df
return current_df.rechunk()


def parse_coverage(data, features_to_keep):
cov_df = pl.read_csv(data.read(), separator='\t',
new_columns=GENOME_COVERAGE_SCHEMA.columns,
dtypes=GENOME_COVERAGE_SCHEMA.dtypes_dict).lazy()
schema_overrides=GENOME_COVERAGE_SCHEMA.dtypes_dict).lazy()

if features_to_keep is not None:
cov_df = cov_df.filter(pl.col(COLUMN_GENOME_ID).is_in(feature_keep))
Expand Down
2 changes: 1 addition & 1 deletion micov/_per_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ def compress_per_sample(coverage, lengths):
if len(sample_contig_coverage) == 0:
return None
else:
return pl.concat(sample_contig_coverage)
return pl.concat(sample_contig_coverage, rechunk=True)
5 changes: 5 additions & 0 deletions micov/tests/test_cov.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def test_compress(self):
['G123', 101, 110],
['G456', 200, 300],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
data = pl.DataFrame([['G123', 11, 50],
['G123', 20, 30],
Expand All @@ -25,6 +26,7 @@ def test_compress(self):
['G123', 51, 89],
['G123', 101, 110],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress(data).sort(COLUMN_GENOME_ID)
plt.assert_frame_equal(obs, exp)
Expand All @@ -34,16 +36,19 @@ def test_coverage_percent(self):
['G456', 200, 299],
['G123', 90, 100],
['G456', 400, 500]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
lengths = pl.DataFrame([['G123', 100],
['G456', 1000],
['G789', 500]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)

g123_covered = (50 - 11) + (100 - 90)
g456_covered = (299 - 200) + (500 - 400)
exp = pl.DataFrame([['G123', g123_covered, 100, (g123_covered / 100) * 100],
['G456', g456_covered, 1000, (g456_covered / 1000) * 100]],
orient='row',
schema=GENOME_COVERAGE_SCHEMA.dtypes_flat)

obs = coverage_percent(data, lengths).sort(COLUMN_GENOME_ID).collect()
Expand Down
16 changes: 16 additions & 0 deletions micov/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_write_qiita_cov(self):
lengths = pl.DataFrame([['GXXX', 600],
['GYYY', 1100],
['GZZZ', 2000]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)

write_qiita_cov(self.name, paths, lengths)
Expand All @@ -101,11 +102,13 @@ def test_write_qiita_cov(self):
['GYYY', 100, 400],
['GYYY', 500, 1000],
['GZZZ', 200, 400]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)

exp_cov_percent = pl.DataFrame([['GXXX', 400, 600, (400 / 600) * 100],
['GYYY', 800, 1100, (800 / 1100) * 100],
['GZZZ', 200, 2000, (200 / 2000) * 100]],
orient='row',
schema=GENOME_COVERAGE_SCHEMA.dtypes_flat)

obs_artifact_cov = obs_artifact_cov.sort([COLUMN_GENOME_ID, COLUMN_START])
Expand All @@ -124,6 +127,7 @@ def test_parse_qiita_coverages(self):
['G123', 1000, 10000],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
# always compress
obs = parse_qiita_coverages(self.name)
Expand All @@ -137,6 +141,7 @@ def test_parse_qiita_coverages_always_compress(self):
['G123', 1000, 10000],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
# always compress
obs = parse_qiita_coverages(self.name, compress_size=0)
Expand All @@ -152,6 +157,7 @@ def test_parse_qiita_coverages_never_compress(self):
['G456', 5, 20],
['G789', 1, 100],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name, compress_size=None)
obs = obs.sort([COLUMN_GENOME_ID, COLUMN_START])
Expand All @@ -163,6 +169,7 @@ def test_parse_qiita_coverages_keep(self):
['G123', 300, 400],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_keep={'sample_a', 'sample_b'})
Expand All @@ -175,6 +182,7 @@ def test_parse_qiita_coverages_drop(self):
['G123', 300, 400],
['G456', 5, 20],
['G789', 1, 100]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', })
Expand All @@ -186,6 +194,7 @@ def test_parse_qiita_coverages_keep_drop(self):
['G123', 100, 200],
['G456', 5, 20],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', },
Expand All @@ -198,6 +207,7 @@ def test_parse_qiita_coverages_keep_feature(self):
['G123', 100, 200],
['G123', 300, 400],
['G123', 1000, 10000]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
feature_keep={'G123', })
Expand All @@ -207,6 +217,7 @@ def test_parse_qiita_coverages_keep_feature(self):
def test_parse_qiita_coverages_drop_feature(self):
exp = pl.DataFrame([['G456', 5, 20],
['G789', 2, 40]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = parse_qiita_coverages(self.name,
sample_drop={'sample_c', },
Expand Down Expand Up @@ -236,6 +247,7 @@ def test_parse_genome_lengths_good(self):
exp = pl.DataFrame([['a', 10],
['b', 20],
['c', 30]],
orient='row',
schema=[COLUMN_GENOME_ID, COLUMN_LENGTH])
obs = parse_genome_lengths(self.name)
plt.assert_frame_equal(obs, exp)
Expand All @@ -251,6 +263,7 @@ def test_parse_genome_lengths_noheader(self):
exp = pl.DataFrame([['a', 10],
['b', 20],
['c', 30]],
orient='row',
schema=[COLUMN_GENOME_ID, COLUMN_LENGTH])
obs = parse_genome_lengths(self.name)
plt.assert_frame_equal(obs, exp)
Expand Down Expand Up @@ -303,6 +316,7 @@ def test_compress_from_stream(self):
['X', 90, 150],
['Y', 10, 60],
['Y', 100, 150]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress_from_stream(data, bufsize=2)
plt.assert_frame_equal(obs.sort([COLUMN_GENOME_ID, ]), exp)
Expand All @@ -323,6 +337,7 @@ def test_compress_from_stream_disable_compression(self):
['X', 100, 150],
['Y', 10, 60],
['Y', 100, 150]],
orient='row',
schema=BED_COV_SCHEMA.dtypes_flat)
obs = compress_from_stream(data, bufsize=2, disable_compression=True)
plt.assert_frame_equal(obs.sort([COLUMN_GENOME_ID, COLUMN_START]), exp)
Expand All @@ -339,6 +354,7 @@ def test_parse_sam_to_df(self):
exp = pl.DataFrame([['A', 0, 'X', 1, '50M', 51],
['B', 0, 'Y', 10, '50M', 60],
['C', 0, 'X', 100, '50M', 150]],
orient='row',
schema=SAM_SUBSET_SCHEMA_PARSED.dtypes_flat)
obs = parse_sam_to_df(data)
plt.assert_frame_equal(obs, exp)
Expand Down
3 changes: 3 additions & 0 deletions micov/tests/test_per_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Tests(unittest.TestCase):
def test_compress_per_sample(self):
lengths = pl.DataFrame([['A', 400],
['B', 500]],
orient='row',
schema=GENOME_LENGTH_SCHEMA.dtypes_flat)
df = pl.DataFrame([['A', 10, 100, 'S1'],
['A', 10, 20, 'S1'],
Expand All @@ -22,13 +23,15 @@ def test_compress_per_sample(self):
['A', 50, 150, 'S2'],
['A', 200, 300, 'S1'],
['A', 201, 299, 'S1']],
orient='row',
schema=BED_COV_SAMPLEID_SCHEMA.dtypes_flat)
s1_a = ((110 - 10) + (300 - 200))
s1_b = ((150 - 50) + (250 - 200))
s2_a = (150 - 50)
exp = pl.DataFrame([['A', s1_a, 400, (s1_a / 400) * 100, 'S1'],
['A', s2_a, 400, (s2_a / 400) * 100, 'S2'],
['B', s1_b, 500, (s1_b / 500) * 100, 'S1']],
orient='row',
schema=GENOME_COVERAGE_WITH_SAMPLEID_SCHEMA.dtypes_flat)
obs = compress_per_sample(df, lengths).sort([COLUMN_GENOME_ID,
COLUMN_SAMPLE_ID]).collect()
Expand Down

0 comments on commit 02366fd

Please sign in to comment.