-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdownsample_reads.py
132 lines (113 loc) · 4.12 KB
/
downsample_reads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python
### Imports ###
import numpy as np
import pandas as pd
from collections import Counter
from pathlib import Path
from typing import Optional, Sequence
### Functions ###
def downsample_reads_single(
series: pd.Series,
sparsity: float,
rng: np.random.Generator,
min_chunk: int = 10000,
) -> pd.Series:
"""
Sparsifies a pandas Series to a desired level by removing gene reads
from the distribution of genes until the sparsity level is reached.
Parameters
----------
series : pd.Series
pandas Series with gene expression
sparsity : float
Desired level of sparsity in percentage points
rng : np.random.Generator
numpy random Generator object to be used
min_chunk : int, optional
minimal number of reads to remove in a single step,
by default 10000
Returns
-------
pd.Series
pandas Series with sparsified gene expression
"""
target_n = int((1.0 - sparsity) * len(series))
orig_counts = Counter({k: v for k,v in series.items()})
probs = series.values / series.sum()
counts = Counter()
while len(counts) < target_n:
to_gen = target_n - len(counts)
selection = rng.choice(series.index, max(to_gen, min_chunk), p=probs)
temp = counts.copy()
temp.update(selection)
for k,v in temp.items():
if v > orig_counts[k]:
temp[k] = orig_counts[k]
if len(temp) < target_n:
counts = temp
else:
index = 0
while len(counts) < target_n:
counts[selection[index]] += 1
index += 1
rem_counts = orig_counts - counts
probs = np.array([v for v in rem_counts.values()]) / rem_counts.total()
sparse_series = pd.Series(counts.values(), index=counts.keys()
).reindex_like(series).fillna(0).astype(int)
return sparse_series
def downsample_reads(
expression: Path,
output: Sequence[Path],
sparsity: float,
n_repeats: int,
random_seed: Optional[int] = None,
) -> None:
"""
Sparsifies gene expression for multiple samples in multiple
replicates by removing gene reads from the distribution of genes
until the desired sparsity level is reached.
Parameters
----------
expression : Path
Path to the file containing the expression data
output : Sequence[Path]
Paths to the files where the sparse expression data will be
saved
sparsity : float
Desired level of sparsity in percentage points
n_repeats : int
Number of repeats to generate
random_seed : Optional[int], optional
Seed for the random number generator or None to use the system
entropy, by default None
"""
rng = np.random.default_rng(random_seed)
df = pd.read_csv(expression, sep='\t', index_col=0, header=None)
for attempt in range(n_repeats):
temp = {}
for pos,col in enumerate(df.columns):
temp[pos] = downsample_reads_single(df[col], 0.01 * sparsity, rng)
df_temp = pd.DataFrame({i: temp[i] for i in range(df.shape[1])})
df_temp.to_csv(output[attempt], sep='\t', header=False)
### Main body ###
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('-e', '--expression', dest='expression',
help='file to load the gene expression from', metavar='FILE')
parser.add_argument('-o', '--output', dest='output', nargs='+',
help='files to save the sparse gene expression into', metavar='FILE')
parser.add_argument('-s', '--sparsity', dest='sparsity', type=float,
help='level of sparsity in percentage points', metavar='FLOAT')
parser.add_argument('-nr', '--n-repeats', dest='n_repeats', type=int,
help='number of repeats', metavar='INT')
parser.add_argument('-rs', '--random-seed', dest='random_seed', type=int,
help='random seed', metavar='INT', default=None)
args = parser.parse_args()
downsample_reads(
args.expression,
args.output,
args.sparsity,
args.n_repeats,
args.random_seed,
)