Skip to content

Commit 324b470

Browse files
authored
Merge pull request #35 from biocore/binning-poke
Binning poke
2 parents ed42db1 + c6d6b9b commit 324b470

File tree

6 files changed

+134
-175
lines changed

6 files changed

+134
-175
lines changed

micov/__init__.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1 @@
11
"""micov: microbiome coverage."""
2-
3-
# note: currently for use with duckdb. we cannot easily enforce threads for polars
4-
# as a specific environment variable must be set prior to the first import. it's
5-
# doable but will need some engineeering to do it correctly.'And, polars does not
6-
# currently have a way to limit memory use.
7-
THREADS = 1
8-
MEMORY = 8 # gb

micov/_quant.py

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
import numpy as np
44
import polars as pl
55

6-
from ._constants import COLUMN_GENOME_ID, COLUMN_SAMPLE_ID
6+
from ._constants import (
7+
COLUMN_GENOME_ID,
8+
COLUMN_SAMPLE_ID,
9+
COLUMN_START_DTYPE,
10+
COLUMN_STOP_DTYPE,
11+
)
712

813
warnings.simplefilter("ignore", category=pl.exceptions.PerformanceWarning)
914

@@ -26,21 +31,23 @@ def create_bin_list(genome_length, bin_num):
2631
pl.Series("a", [0, genome_length], strict=False)
2732
.hist(bin_count=bin_num)
2833
.lazy()
29-
.select(pl.col("breakpoint").alias("bin_stop"))
34+
.select(pl.col("breakpoint").round().cast(COLUMN_STOP_DTYPE).alias("bin_stop"))
3035
.with_row_index("bin_idx", offset=1)
3136
)
3237
bin_list_pos_start = (
3338
pl.Series("a", [0, genome_length], strict=False)
3439
.hist(bin_count=bin_num)
3540
.lazy()
36-
.select(pl.col("breakpoint").alias("bin_start"))
41+
.select(
42+
pl.col("breakpoint").round().cast(COLUMN_START_DTYPE).alias("bin_start")
43+
)
3744
.with_row_index("bin_idx", offset=2)
3845
)
3946
bin_list = (
4047
bin_list_pos_start.join(bin_list_pos_stop, on="bin_idx", how="right")
4148
.fill_null(0)
4249
.select([pl.col("bin_idx"), pl.col("bin_start"), pl.col("bin_stop")])
43-
.with_columns(pl.col("bin_idx").cast(pl.Int64))
50+
.with_columns(pl.col("bin_idx").cast(COLUMN_START_DTYPE))
4451
)
4552
# setting the bin_stop of the last bin to be exactly the genome length + 1
4653
bin_list = bin_list.with_columns(
@@ -52,51 +59,30 @@ def create_bin_list(genome_length, bin_num):
5259
return bin_list
5360

5461

55-
def pos_to_bins(pos, variable, bin_num):
56-
genome_length = pos.select("length").limit(1).collect().item()
62+
def pos_to_bins(pos, variable, bin_num, genome_length):
63+
# genome_length = pos.select("length").limit(1).collect().item()
5764
bin_list = create_bin_list(genome_length, bin_num)
5865

5966
# get start_bin_idx and stop_bin_idx
6067
bin_edges = [0.0] + bin_list.select( # noqa: RUF005
6168
pl.col("bin_stop")
6269
).collect().to_series().to_list()
63-
cut_start = (
64-
pos.select(pl.col("start"))
65-
.collect()
66-
.to_series()
67-
.cut(
68-
bin_edges,
69-
labels=np.arange(len(bin_edges) + 1).astype(str),
70-
left_closed=True,
71-
)
72-
.cast(pl.Int64)
73-
.alias("start_bin_idx")
74-
)
75-
cut_stop = (
76-
pos.select(pl.col("stop"))
77-
.collect()
78-
.to_series()
79-
.cut(
80-
bin_edges,
81-
labels=np.arange(len(bin_edges) + 1).astype(str),
82-
left_closed=False,
83-
)
84-
.cast(pl.Int64)
85-
.alias("stop_bin_idx")
86-
)
87-
pos = pos.with_columns([cut_start, cut_stop])
88-
89-
# update stop_bin_idx +1 for pl.arange and generate range of bins
90-
pos = pos.with_columns((pl.col("stop_bin_idx") + 1).alias("stop_bin_idx_add1"))
70+
labels = np.arange(len(bin_edges) + 1).astype(str)
9171

92-
# generate range of bins covered
93-
pos = pos.with_columns(
94-
pl.int_ranges("start_bin_idx", "stop_bin_idx_add1").alias("bin_idx")
95-
).drop("stop_bin_idx_add1")
96-
97-
# generate bin_df
98-
df_bins = (
99-
pos.explode("bin_idx")
72+
return (
73+
pos.with_columns(
74+
pl.col("start")
75+
.cut(bin_edges, labels=labels, left_closed=True)
76+
.cast(COLUMN_START_DTYPE)
77+
.alias("start_bin_idx"),
78+
pl.col("stop")
79+
.cut(bin_edges, labels=labels, left_closed=False)
80+
.cast(COLUMN_STOP_DTYPE)
81+
.alias("stop_bin_idx")
82+
+ 1,
83+
)
84+
.with_columns(pl.int_ranges("start_bin_idx", "stop_bin_idx").alias("bin_idx"))
85+
.explode("bin_idx")
10086
.group_by(COLUMN_GENOME_ID, variable, "bin_idx")
10187
.agg(
10288
pl.col("start").len().alias("read_hits"),
@@ -106,4 +92,3 @@ def pos_to_bins(pos, variable, bin_num):
10692
.sort(by=["bin_idx", variable])
10793
.join(bin_list, how="left", on="bin_idx")
10894
)
109-
return df_bins

micov/_view.py

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import duckdb
44
import polars as pl
55

6-
from micov import MEMORY, THREADS
76
from micov._constants import (
87
COLUMN_COVERED,
98
COLUMN_GENOME_ID,
@@ -19,7 +18,9 @@
1918
class View:
2019
"""View subsets of coverage data."""
2120

22-
def __init__(self, dbbase, sample_metadata, features_to_keep):
21+
def __init__(
22+
self, dbbase, sample_metadata, features_to_keep, threads=1, memory="8gb"
23+
):
2324
self.dbbase = dbbase
2425
self.sample_metadata = sample_metadata
2526
self.features_to_keep = features_to_keep
@@ -28,7 +29,7 @@ def __init__(self, dbbase, sample_metadata, features_to_keep):
2829
self.constrain_features = False
2930

3031
self.con = duckdb.connect(
31-
":memory:", config={"threads": THREADS, "memory_limit": f"{MEMORY}gb"}
32+
":memory:", config={"threads": threads, "memory_limit": f"{memory}"}
3233
)
3334
self._init()
3435

@@ -106,9 +107,9 @@ def _load_db(self):
106107
# TODO: replace with duckdb native per sample compression
107108
# Do we stream to parquet? this could be large
108109
positions_df = self.con.sql(f"""SELECT * FROM positions
109-
ORDER BY {COLUMN_SAMPLE_ID},
110-
{COLUMN_GENOME_ID},
111-
{COLUMN_START}""").pl()
110+
ORDER BY {COLUMN_SAMPLE_ID},
111+
{COLUMN_GENOME_ID},
112+
{COLUMN_START}""").pl()
112113

113114
if len(positions_df) == 0:
114115
msg = "No positions left after filtering."
@@ -135,7 +136,7 @@ def _load_db(self):
135136
SELECT * FROM recomputed_coverage""")
136137

137138
self.con.sql(f"""CREATE TABLE feature_metadata AS
138-
SELECT *
139+
SELECT *, {COLUMN_STOP} - {COLUMN_START} AS {COLUMN_LENGTH}
139140
FROM feature_constraint fc
140141
SEMI JOIN coverage cov USING ({COLUMN_GENOME_ID})""")
141142

@@ -158,18 +159,19 @@ def _load_db(self):
158159
JOIN metadata md
159160
ON pos.{COLUMN_SAMPLE_ID}=md.{COLUMN_SAMPLE_ID}""")
160161
self.con.sql(f"""CREATE VIEW genome_lengths AS
161-
SELECT {COLUMN_GENOME_ID},
162-
FIRST({COLUMN_LENGTH}) AS {COLUMN_STOP}
163-
FROM coverage
164-
GROUP BY {COLUMN_GENOME_ID};
165-
CREATE TABLE feature_metadata AS
166-
SELECT fc.{COLUMN_GENOME_ID},
162+
SELECT {COLUMN_GENOME_ID},
163+
FIRST({COLUMN_LENGTH}) AS {COLUMN_LENGTH}
164+
FROM coverage
165+
GROUP BY {COLUMN_GENOME_ID};
166+
CREATE TABLE feature_metadata AS
167+
SELECT f.{COLUMN_GENOME_ID},
167168
0::UINTEGER AS {COLUMN_START},
168-
gl.{COLUMN_STOP}
169-
FROM feature_constraint fc
170-
JOIN genome_lengths gl
171-
ON fc.{COLUMN_GENOME_ID}=gl.{COLUMN_GENOME_ID}
172-
""")
169+
g.{COLUMN_LENGTH} AS {COLUMN_STOP},
170+
g.{COLUMN_LENGTH}
171+
FROM feature_constraint f
172+
JOIN genome_lengths g
173+
ON f.{COLUMN_GENOME_ID}=g.{COLUMN_GENOME_ID}
174+
""")
173175
else:
174176
# limit the samples considered
175177
self.con.sql(f"""CREATE VIEW coverage AS
@@ -191,17 +193,18 @@ def _load_db(self):
191193
# use the existing length data from coverage to set the start/stop
192194
# positions in feature_metadata
193195
self.con.sql(f"""CREATE VIEW genome_lengths AS
194-
SELECT {COLUMN_GENOME_ID},
195-
FIRST({COLUMN_LENGTH}) AS {COLUMN_STOP}
196-
FROM coverage
197-
GROUP BY {COLUMN_GENOME_ID};
198-
CREATE TABLE feature_metadata AS
199-
SELECT fc.{COLUMN_GENOME_ID},
196+
SELECT {COLUMN_GENOME_ID},
197+
FIRST({COLUMN_LENGTH}) AS {COLUMN_LENGTH}
198+
FROM coverage
199+
GROUP BY {COLUMN_GENOME_ID};
200+
CREATE TABLE feature_metadata AS
201+
SELECT f.{COLUMN_GENOME_ID},
200202
0::UINTEGER AS {COLUMN_START},
201-
gl.{COLUMN_STOP}
202-
FROM feature_constraint fc
203-
JOIN genome_lengths gl
204-
ON fc.{COLUMN_GENOME_ID}=gl.{COLUMN_GENOME_ID}
203+
g.{COLUMN_LENGTH} AS {COLUMN_STOP},
204+
g.{COLUMN_LENGTH}
205+
FROM feature_constraint f
206+
JOIN genome_lengths g
207+
ON f.{COLUMN_GENOME_ID}=g.{COLUMN_GENOME_ID}
205208
""")
206209

207210
def metadata(self):

0 commit comments

Comments
 (0)