|
| 1 | +"""BAM parsing and panel construction.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from collections import defaultdict |
| 6 | +from pathlib import Path |
| 7 | +from typing import Any |
| 8 | + |
| 9 | +from tview.models import Panel |
| 10 | + |
| 11 | +# -- CIGAR operations -------------------------------------------------- |
| 12 | +CIGAR_MATCH = 0 # M |
| 13 | +CIGAR_INS = 1 # I |
| 14 | +CIGAR_DEL = 2 # D |
| 15 | +CIGAR_REF_SKIP = 3 # N |
| 16 | +CIGAR_SOFT_CLIP = 4 # S |
| 17 | +CIGAR_SEQ_MATCH = 7 # = |
| 18 | +CIGAR_SEQ_MISMATCH = 8 # X |
| 19 | + |
| 20 | + |
| 21 | +def build_read_row( |
| 22 | + read: Any, |
| 23 | + ref_start: int, |
| 24 | + ref_end: int, |
| 25 | +) -> tuple[dict[int, str], dict[int, list[str]]]: |
| 26 | + """Extract aligned bases and insertions from a single pysam read. |
| 27 | +
|
| 28 | + Walks the CIGAR string to map query bases onto reference positions, |
| 29 | + collecting insertions keyed by their anchor reference position. |
| 30 | +
|
| 31 | + Args: |
| 32 | + read: A pysam.AlignedSegment with cigartuples and query_sequence. |
| 33 | + ref_start: 0-based start of the reference window (inclusive). |
| 34 | + ref_end: 0-based end of the reference window (exclusive). |
| 35 | +
|
| 36 | + Returns: |
| 37 | + A tuple of (aligned, inserts) where aligned maps ref positions to |
| 38 | + bases and inserts maps ref positions to lists of inserted bases. |
| 39 | + """ |
| 40 | + aligned: dict[int, str] = {} |
| 41 | + inserts: dict[int, list[str]] = defaultdict(list) |
| 42 | + qpos, rpos = 0, read.reference_start |
| 43 | + for op, length in read.cigartuples: |
| 44 | + if op in (CIGAR_MATCH, CIGAR_SEQ_MATCH, CIGAR_SEQ_MISMATCH): |
| 45 | + for _ in range(length): |
| 46 | + if ref_start <= rpos < ref_end: |
| 47 | + aligned[rpos] = read.query_sequence[qpos].upper() |
| 48 | + qpos += 1 |
| 49 | + rpos += 1 |
| 50 | + elif op == CIGAR_INS: |
| 51 | + anchor = rpos - 1 |
| 52 | + if ref_start <= anchor < ref_end: |
| 53 | + for j in range(length): |
| 54 | + inserts[anchor].append(read.query_sequence[qpos + j].upper()) |
| 55 | + qpos += length |
| 56 | + elif op == CIGAR_DEL: |
| 57 | + for _ in range(length): |
| 58 | + if ref_start <= rpos < ref_end: |
| 59 | + aligned[rpos] = "-" |
| 60 | + rpos += 1 |
| 61 | + elif op == CIGAR_REF_SKIP: |
| 62 | + rpos += length |
| 63 | + elif op == CIGAR_SOFT_CLIP: |
| 64 | + qpos += length |
| 65 | + return aligned, inserts |
| 66 | + |
| 67 | + |
| 68 | +def bam_panel(bam_path: str | Path, ref_path: str | Path, region: str) -> Panel: |
| 69 | + """Build a Panel from a BAM file with reference FASTA and genomic region. |
| 70 | +
|
| 71 | + Reads are sorted by start position and strand. Insertion columns are |
| 72 | + expanded so all reads align on a common grid. |
| 73 | +
|
| 74 | + Args: |
| 75 | + bam_path: Path to the indexed BAM file. |
| 76 | + ref_path: Path to the reference FASTA (must be indexed). |
| 77 | + region: Genomic region string in "chrom:start-end" format (0-based start). |
| 78 | +
|
| 79 | + Returns: |
| 80 | + A Panel with reference row, read rows, insertion columns, and tick labels. |
| 81 | + """ |
| 82 | + import pysam |
| 83 | + |
| 84 | + chrom, rest = region.split(":") |
| 85 | + start, end = [int(x) for x in rest.replace(",", "").split("-")] |
| 86 | + |
| 87 | + with pysam.FastaFile(ref_path) as fasta: |
| 88 | + ref_seq = fasta.fetch(chrom, start, end).upper() |
| 89 | + |
| 90 | + with pysam.AlignmentFile(bam_path, "rb") as samfile: |
| 91 | + reads = [ |
| 92 | + r |
| 93 | + for r in samfile.fetch(chrom, start, end) |
| 94 | + if not r.is_unmapped and r.cigartuples |
| 95 | + ] |
| 96 | + reads.sort(key=lambda r: (r.reference_start, r.is_reverse)) |
| 97 | + |
| 98 | + # Find max insertion at each ref position |
| 99 | + max_ins: dict[int, int] = defaultdict(int) |
| 100 | + read_data = [] |
| 101 | + for read in reads: |
| 102 | + aligned, inserts = build_read_row(read, start, end) |
| 103 | + read_data.append((read, aligned, inserts)) |
| 104 | + for rpos, bases in inserts.items(): |
| 105 | + max_ins[rpos] = max(max_ins[rpos], len(bases)) |
| 106 | + |
| 107 | + # Build column map |
| 108 | + col_map: dict[int, int] = {} |
| 109 | + ins_col_set: set[int] = set() |
| 110 | + col = 0 |
| 111 | + for rpos in range(start, end): |
| 112 | + col_map[rpos] = col |
| 113 | + col += 1 |
| 114 | + n_ins = max_ins.get(rpos, 0) |
| 115 | + for j in range(n_ins): |
| 116 | + ins_col_set.add(col + j) |
| 117 | + col += n_ins |
| 118 | + total_cols = col |
| 119 | + |
| 120 | + # Build ref row with '-' in insertion columns |
| 121 | + ref_row: list[str] = [] |
| 122 | + for rpos in range(start, end): |
| 123 | + ref_row.append(ref_seq[rpos - start]) |
| 124 | + for _ in range(max_ins.get(rpos, 0)): |
| 125 | + ref_row.append("-") |
| 126 | + |
| 127 | + # Build sequence rows |
| 128 | + seq_rows: list[tuple[str, list[str], bool]] = [] |
| 129 | + for read, aligned, inserts in read_data: |
| 130 | + row = [" "] * total_cols |
| 131 | + for rpos in range(start, end): |
| 132 | + c = col_map[rpos] |
| 133 | + if rpos in aligned: |
| 134 | + row[c] = aligned[rpos] |
| 135 | + if rpos in aligned or rpos in inserts: |
| 136 | + n_ins = max_ins.get(rpos, 0) |
| 137 | + read_ins = inserts.get(rpos, []) |
| 138 | + for j in range(n_ins): |
| 139 | + if j < len(read_ins): |
| 140 | + row[c + 1 + j] = read_ins[j] |
| 141 | + else: |
| 142 | + row[c + 1 + j] = "-" |
| 143 | + seq_rows.append((read.query_name, row, read.is_reverse)) |
| 144 | + |
| 145 | + # Column labels: 1-based relative, ticks at 1, 10, 20... |
| 146 | + ref_width = end - start |
| 147 | + tick_1based = [1] + list(range(10, ref_width + 1, 10)) |
| 148 | + col_labels = [ |
| 149 | + (col_map[start + p - 1], str(p)) for p in tick_1based if (start + p - 1) < end |
| 150 | + ] |
| 151 | + |
| 152 | + label = Path(bam_path).stem |
| 153 | + return Panel(label, ref_row, seq_rows, total_cols, col_labels, ins_col_set) |
0 commit comments