Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track sra_fork #97

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion binchicken.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
- bird_tool_utils_python=0.4.*
- extern=0.4.*
- ruamel.yaml=0.17.*
- polars=0.20.*
- polars=0.20.4
- pigz=2.3.*
- pyarrow=12.0.*
- parallel=20230522
Expand Down
49 changes: 37 additions & 12 deletions binchicken/workflow/coassemble.smk
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ qc_reads_2 = {read: output_dir + f"/qc/{read}_2.fastq.gz" for read in config["re
def get_mem_mb(wildcards, threads):
return 8 * 1000 * threads

def get_runtime(base_hours):
def runtime_func(wildcards, attempt):
return f"{attempt * base_hours}h"
return runtime_func

def get_genomes(wildcards, version=None):
version = version if version else wildcards.version
if version == "":
Expand Down Expand Up @@ -472,7 +477,7 @@ rule qc_reads:
params:
quality_cutoff = 15,
unqualified_percent_limit = 40,
min_length = 80,
min_length = 70,
threads: 16
resources:
mem_mb=get_mem_mb,
Expand Down Expand Up @@ -503,6 +508,7 @@ rule collect_genomes:
appraise_unbinned = output_dir + "/appraise/unbinned.otu_table.tsv",
output:
temp(output_dir + "/mapping/{read}_reference.fna"),
threads: 1
params:
genomes = config["genomes"],
sample = "{read}",
Expand All @@ -522,7 +528,7 @@ rule map_reads:
threads: 16
resources:
mem_mb=get_mem_mb,
runtime = "12h",
runtime = get_runtime(base_hours = 12),
log:
logs_dir + "/mapping/{read}_coverm.log",
benchmark:
Expand Down Expand Up @@ -552,7 +558,7 @@ rule filter_bam_files:
threads: 16
resources:
mem_mb=get_mem_mb,
runtime = "4h",
runtime = get_runtime(base_hours = 4),
log:
logs_dir + "/mapping/{read}_filter.log",
benchmark:
Expand All @@ -579,7 +585,7 @@ rule bam_to_fastq:
threads: 16
resources:
mem_mb=get_mem_mb,
runtime = "4h",
runtime = get_runtime(base_hours = 4),
log:
logs_dir + "/mapping/{read}_fastq.log",
conda:
Expand Down Expand Up @@ -642,6 +648,25 @@ rule aviary_commands:
#########################################
### Run Aviary commands (alternative) ###
#########################################
def get_assemble_threads(wildcards, attempt):
# Attempt 1 with 32, 2 with 64, then 32 with Megahit
current_threads = 64 if attempt == 2 else 32
threads = min(int(config["aviary_threads"]), current_threads)

return threads

def get_assemble_memory(wildcards, attempt, unit="GB"):
# Attempt 1 with 250GB, 2 with 500GB, then 250GB with Megahit
current_mem = 500 if attempt == 2 else 250
mem = min(int(config["aviary_memory"]), current_mem)
mult = 1000 if unit == "MB" else 1

return mem * mult

def get_assemble_assembler(wildcards, attempt):
# Attempt 1/2 with Metaspades, then Megahit
return "" if attempt < 3 else "--use-megahit"

rule aviary_assemble:
input:
output_dir + "/mapping/done" if config["assemble_unmapped"] else output_dir + "/qc/done" if config["run_qc"] else [],
Expand All @@ -657,13 +682,13 @@ rule aviary_assemble:
drytouch = "&& touch "+output_dir+"/coassemble/{coassembly}/assemble/assembly/final_contigs.fasta" if config["aviary_dryrun"] else "",
conda_prefix = config["conda_prefix"] if config["conda_prefix"] else ".",
tmpdir = config["tmpdir"],
threads:
threads = config["aviary_threads"]
threads: lambda wildcards, attempt: get_assemble_threads(wildcards, attempt)
resources:
mem_mb = int(config["aviary_memory"])*1000,
mem_gb = int(config["aviary_memory"]),
mem_mb = lambda wildcards, attempt: get_assemble_memory(wildcards, attempt, unit="MB"),
mem_gb = get_assemble_memory,
runtime = "96h",
assembler = lambda wildcards, attempt: "" if attempt == 1 else "--use-megahit",
assembler = get_assemble_assembler,
queue = "microbiome",
log:
logs_dir + "/aviary/{coassembly}_assemble.log"
conda:
Expand Down Expand Up @@ -711,7 +736,7 @@ rule aviary_recover:
tmpdir = config["tmpdir"],
localrule: True
threads:
int(config["aviary_threads"])//2
1
resources:
mem_mb = int(config["aviary_memory"])*1000//2,
mem_gb = int(config["aviary_memory"])//2,
Expand All @@ -733,8 +758,8 @@ rule aviary_recover:
"-2 {params.reads_2} "
"--output {params.output} "
"{params.fast} "
"-n {threads} "
"-t {threads} "
"-n 32 "
"-t 32 "
"-m {resources.mem_gb} "
"--skip-qc "
"{params.snakemake_profile} "
Expand Down
15 changes: 14 additions & 1 deletion binchicken/workflow/env/aviary.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,17 @@ channels:
- conda-forge
- bioconda
dependencies:
- aviary=0.8.*
- python >=3.8,<=3.11
- snakemake >=7.0.0,<=7.32.3
- ruamel.yaml >=0.15.99
- numpy
- pandas
- biopython
- mamba >=0.8.2
- pigz =2.6
- parallel
- bbmap
- pip
- git
- pip:
- git+https://github.com/AroneyS/aviary.git@d9b0fb5
1 change: 1 addition & 0 deletions binchicken/workflow/scripts/cluster_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def filter_max_coassembly_size(df, MAX_COASSEMBLY_SIZE):

if elusive_edges.height > 10**4:
min_cluster_targets = 10
elusive_edges = elusive_edges.filter(pl.col("target_ids").str.count_matches(",") > min_cluster_targets - 1)
else:
min_cluster_targets = 1

Expand Down
111 changes: 73 additions & 38 deletions binchicken/workflow/scripts/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,47 @@
import polars as pl
import os

OUTPUT_COLUMNS={
SINGLEM_COLUMNS = {
"gene": str,
"sample": str,
"sequence": str,
"num_hits": int,
"coverage": float,
"taxonomy": str,
}

TARGET_COLUMNS = SINGLEM_COLUMNS | {
"target": int,
}
APPRAISE_COLUMNS = SINGLEM_COLUMNS | {
"found_in": str,
}

CLUSTER_COLUMNS = {
"samples": str,
"length": int,
"total_targets": int,
"total_size": float,
"recover_samples": str,
"coassembly": str,
}
EDGE_COLUMNS = {
"style": str,
"cluster_size": int,
"samples": str,
"target_ids": str,
}

OUTPUT_COLUMNS = {
"coassembly": str,
"gene": str,
"sequence": str,
"genome": str,
"target": str,
"found_in": str,
"source_samples": str,
"source_num_hits": int,
"source_coverage": float,
"taxonomy": str,
}
SUMMARY_COLUMNS = {
Expand All @@ -34,18 +67,6 @@ def evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges
empty_output = pl.DataFrame(schema=OUTPUT_COLUMNS)
return empty_output, empty_output, pl.DataFrame(schema=SUMMARY_COLUMNS)

# Load otu table of target sequences and get unique id for each sequence (to match sequences to target id)
relevant_target_otu_table = (
target_otu_table
.select([
"gene", "sequence",
pl.first("target").over(["gene", "sequence"]).cast(str),
pl.first("taxonomy").over(["gene", "sequence"]),
])
.unique()
.drop_nulls()
)

# Load elusive clusters and edges (to match targets to coassemblies, with duplicates)
sample_coassemblies = (
elusive_clusters
Expand Down Expand Up @@ -77,12 +98,6 @@ def evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges
.explode("target")
)

coassembly_edges = (
elusive_edges
.select(["target", "coassembly"])
.unique()
)

# Create otu table with original sequence, samples present, cluster id, target id and associated coassemblies
sample_edges = (
elusive_edges
Expand All @@ -91,43 +106,56 @@ def evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges
source_samples = pl.col("samples")
.flatten()
.unique()
.sort()
.str.concat(",")
)
.explode("source_samples")
)

elusive_otu_table = (
coassembly_edges
.join(relevant_target_otu_table, on="target", how="left")
.select(
"gene", "sequence", "coassembly", "taxonomy",
target_otu_table
.with_columns(
pl.col("target").cast(str),
pl.lit(None).cast(str).alias("found_in"),
"target",
source_samples =
pl.when(pl.col("sample").is_in(sample_coassemblies.get_column("samples")))
.then(pl.col("sample"))
.otherwise(pl.col("sample").str.replace(r"(_|\.)1$", "")),
)
.join(sample_edges, how="inner", on=["target", "source_samples"])
.group_by("gene", "sequence", "coassembly", "taxonomy", "found_in", "target")
.agg(
pl.col("source_samples").sort().str.concat(","),
source_num_hits = pl.sum("num_hits"),
source_coverage = pl.sum("coverage"),
)
.join(sample_edges, on=["coassembly", "target"], how="left")
)

# Add binned otu table to above with target NA
nontarget_otu_table = (
pl.concat([
binned_otu_table,
binned_otu_table
.with_columns(target = pl.lit(None).cast(str)),
target_otu_table
.join(elusive_otu_table, on=["gene", "sequence"], how="anti")
.drop("target")
.with_columns(pl.lit(None).cast(str).alias("found_in"))
.with_columns(
found_in = pl.lit(None).cast(str),
target = pl.lit("None"),
),
])
.select([
pl.col("sample").str.replace(r"_1$", "").str.replace(r"\.1$", ""),
"gene", "sequence", "taxonomy", "found_in"
"gene", "sequence", "taxonomy", "found_in", "num_hits", "coverage", "target"
])
.join(sample_coassemblies, left_on="sample", right_on="samples", how="left")
.drop_nulls("coassembly")
.group_by(["gene", "sequence", "coassembly"])
.agg([
pl.first("taxonomy"),
pl.first("found_in"),
pl.lit(None).cast(str).alias("target"),
pl.col("sample").unique().sort().str.concat(",").alias("source_samples")
pl.first("target"),
pl.col("sample").unique().sort().str.concat(",").alias("source_samples"),
pl.sum("num_hits").alias("source_num_hits"),
pl.sum("coverage").alias("source_coverage"),
])
.unique()
)
Expand All @@ -151,7 +179,8 @@ def evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges
haystack_otu_table, on=["coassembly", "gene", "sequence"], how="outer_coalesce", suffix="old"
)
.select(
"coassembly", "gene", "sequence", "genome", "target", "found_in", "source_samples",
"coassembly", "gene", "sequence", "genome", "target", "found_in",
"source_samples", "source_num_hits", "source_coverage",
pl.when(pl.col("taxonomy").is_null())
.then(pl.col("taxonomyold"))
.otherwise(pl.col("taxonomy"))
Expand All @@ -169,6 +198,9 @@ def evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges
.filter(
~pl.all_horizontal(pl.col(["target", "found_in", "source_samples"]).is_null())
)
.with_columns(
target = pl.when(pl.col("target") == "None").then(pl.lit(None)).otherwise(pl.col("target"))
)
)

unmatched = (
Expand All @@ -189,6 +221,9 @@ def summarise_stats(matches, combined_otu_table, recovered_bins):
.filter(
pl.col("genome").is_not_null()
)
.with_columns(
target = pl.when(pl.col("target") == "None").then(pl.lit(None)).otherwise(pl.col("target"))
)
)

summary = (
Expand Down Expand Up @@ -302,11 +337,11 @@ def summarise_stats(matches, combined_otu_table, recovered_bins):
novel_hits_path = snakemake.output.novel_hits
summary_stats_path = snakemake.output.summary_stats

target_otu_table = pl.read_csv(target_path, separator="\t")
binned_otu_table = pl.read_csv(binned_path, separator="\t")
elusive_clusters = pl.read_csv(elusive_clusters_path, separator="\t")
elusive_edges = pl.read_csv(elusive_edges_path, separator="\t")
recovered_otu_table = pl.read_csv(recovered_otu_table_path, separator="\t")
target_otu_table = pl.read_csv(target_path, separator="\t", dtypes=TARGET_COLUMNS)
binned_otu_table = pl.read_csv(binned_path, separator="\t", dtypes=APPRAISE_COLUMNS)
elusive_clusters = pl.read_csv(elusive_clusters_path, separator="\t", dtypes=CLUSTER_COLUMNS)
elusive_edges = pl.read_csv(elusive_edges_path, separator="\t", dtypes=EDGE_COLUMNS)
recovered_otu_table = pl.read_csv(recovered_otu_table_path, separator="\t", dtypes=SINGLEM_COLUMNS)

matches, unmatched, summary = evaluate(target_otu_table, binned_otu_table, elusive_clusters, elusive_edges, recovered_otu_table, recovered_bins)
# Export hits matching elusive targets
Expand Down
Loading
Loading