Skip to content

Commit

Permalink
Full code documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
axdanbol committed Dec 11, 2023
1 parent 15d7edf commit 1ec2474
Show file tree
Hide file tree
Showing 39 changed files with 686 additions and 151 deletions.
7 changes: 7 additions & 0 deletions src/cellxgene/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Environment configuration options

#### Required
None

#### Optional
- `CELLXGENE_API_ENDPOINT` - Api endpoint for assets
64 changes: 55 additions & 9 deletions src/cellxgene/downloader.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { execFile as callbackExecFile } from 'node:child_process';
import { promisify } from 'node:util';
import { spawn } from 'node:child_process';
import { execFile as callbackExecFile, spawn } from 'node:child_process';
import { writeFile } from 'node:fs/promises';
import { join } from 'node:path';
import { promisify } from 'node:util';

import { Dataset } from '../dataset/dataset.js';
import { Cache } from '../util/cache.js';
import { concurrentMap } from '../util/concurrent-map.js';
Expand Down Expand Up @@ -49,6 +49,7 @@ export class Downloader {
this.extractScriptFile = 'extract_dataset_multi.py';
/** @type {string} */
this.extractScriptFilePath = getSrcFilePath(config, 'cellxgene', this.extractScriptFile);
/** @type {string} */
this.extractMetdataScriptFile = 'extract_donor_metadata.py';
/** @type {string} */
this.extractMetdataScriptFilePath = getSrcFilePath(config, 'cellxgene', this.extractMetdataScriptFile);
Expand Down Expand Up @@ -76,6 +77,9 @@ export class Downloader {
const { stdout } = await execFile('python3', [this.extractMetdataScriptFilePath, dataset.dataFilePath]);

dataset.donor_id = dataset.id.split('$')[0];
dataset.organ_id = dataset.organ ? `http://purl.obolibrary.org/obo/UBERON_${dataset.organ.split(':')[1]}` : '';
dataset.block_id = `${dataset.id}_Block`;
dataset.dataset_id = dataset.id;

// Parse sex line. Format: `sex: X\n`
const sex_match = /sex:(.+)\n/i.exec(stdout);
Expand All @@ -88,21 +92,28 @@ export class Downloader {
// Parse age line. Format: `age: X\n`
const ethnicity_match = /ethnicity:(.+)\n/i.exec(stdout);
dataset.donor_race = ethnicity_match?.[1].trim() ?? '';

dataset.organ_id = dataset.organ ? `http://purl.obolibrary.org/obo/UBERON_${dataset.organ.split(':')[1]}` : '';

dataset.block_id = `${dataset.id}_Block`;
dataset.dataset_id = dataset.id;
}

/**
* Downloads and caches a collection metadata
*
* @param {string} collection Collection id
* @returns Collection metadata
*/
async downloadCollection(collection) {
const url = new URL(`${COLLECTIONS_PATH}${collection}`, this.endpoint);
return this.collectionCache.setDefaultFn(collection, () =>
logEvent('CellXGene:DownloadCollection', collection, () => downloadCollectionMetadata(url))
);
}

async downloadAssets(/** @type {any[]} */ assets) {
/**
* Downloads and caches asset files
*
* @param {{id: string, dataset: string}[]} assets Assets to download
* @returns Array of file paths to the assets
*/
async downloadAssets(assets) {
const maxConcurrency = this.config.get(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
const downloadAsset = ({ id, dataset }) =>
this.assetCache.setDefaultFn(id, async () => {
Expand All @@ -128,6 +139,12 @@ export class Downloader {
});
}

/**
* Attached metadata from collections to each dataset
*
* @param {Dataset[]} datasets Datasets
* @returns Datasets which has proper metadata
*/
async attachMetadata(datasets) {
const maxConcurrency = this.config.get(MAX_CONCURRENCY, DEFAULT_MAX_CONCURRENCY);
const attach = async (dataset) => {
Expand Down Expand Up @@ -166,6 +183,12 @@ export class Downloader {
return datasets.filter(({ tissueId }) => !!tissueId);
}

/**
* Find and adds the organ associated with the datasets tissue
*
* @param {Dataset[]} datasets Datasets to determine organ for
* @returns The datasets
*/
async lookupOrgan(datasets) {
const tissueIds = datasets.map(({ tissueId }) => tissueId).filter((id) => UBERON_ID_REGEX.test(id));
const uniqueTissueIds = Array.from(new Set(tissueIds));
Expand All @@ -182,6 +205,13 @@ export class Downloader {
return datasets;
}

/**
* Extract all datasets from a collection
*
* @param {string} collection Collection id
* @param {{id: string, dataset: string}[]} assets
* @param {string[]} assetFiles Asset file paths
*/
async extractDatasets(collection, assets, assetFiles) {
return this.extractCache.setDefaultFn(collection, async () => {
const datasets = this.datasetsByCollection.get(collection);
Expand All @@ -196,6 +226,14 @@ export class Downloader {
});
}

/**
* Extract dataset h5ad from a set of asset h5ad files.
* Forwards stdout and stderr from the extract python process.
*
* @param {string} extractInfoFilePath Path to extract info file
* @param {string} tempExtractDirPath Path to a temp directory
* @returns Promise resolving when the extraction is done
*/
async runExtractDatasetsScript(extractInfoFilePath, tempExtractDirPath) {
return new Promise((resolve, reject) => {
const process = spawn(
Expand Down Expand Up @@ -223,6 +261,14 @@ export class Downloader {
});
}

/**
* Serializes extraction information as a json formatted document
*
* @param {Dataset[]} datasets Datasets
* @param {any[]} assets Assets
* @param {string[]} assetFiles Paths to asset files
* @returns Json serialized string
*/
serializeExtractInfo(datasets, assets, assetFiles) {
const content = {
assets,
Expand Down
103 changes: 103 additions & 0 deletions src/cellxgene/extract_dataset_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class ExtractInfo(t.TypedDict):

@contextlib.contextmanager
def _log_event(format: str, *args):
"""Logs the start, end, and any error from code run inside a with statement.
The event is always passed before the additional arguments to the logger.
Args:
format (str): Formatting string passed to the logger
*args (Any): Additional arguments passed to the logger
"""
try:
_logger.info(format, "Start", *args)
yield None
Expand All @@ -47,6 +55,22 @@ def _log_event(format: str, *args):


def _get_env_list(key: str, default: StrList) -> StrList:
"""Parses an environment variable into a list.
The parser splits the value using the separators ",", ";", and spaces.
Multiple consecutive separators are treated as a single one.
Args:
key (str): Environment variable name
default (StrList): Default value returned when the environment variable is not set
Returns:
StrList: The parsed list
Examples:
"a,b;c d" -> ["a", "b", "c", "d"]
"a,;,b" -> ["a", "b"]
"""
raw = os.environ.get(key)
return re.split("[\s,;]+", raw) if raw else default

Expand All @@ -56,12 +80,31 @@ def _get_env_list(key: str, default: StrList) -> StrList:


class AssetSplitter:
"""Splits an asset h5ad into multiple smaller file each containing
the rows matching specific column value combinations.
Attributes:
id (str): Id of asset
file (Path): Path to asset file
column_candidates (t.List[StrList]): Possible column names to search for in the asset file
"""

def __init__(self, id: str, file: Path, *column_candidates: StrList):
self.id = id
self.file = file
self.column_candidates = column_candidates

def split(self, splits: t.Iterable[StrList], out_dir: Path):
"""Splits the asset file.
Args:
splits (t.Iterable[StrList]): Combinations of values to mask each split from
out_dir (Path): Output directory to store split files in
Returns:
t.List[t.Optional[Path]]: A path for each split referencing the smaller h5ad,
None for splits that would result in empty h5ad files.
"""
with _log_event("CellXGene:AssetSplitter:%s - %s", self.id):
with _log_event("CellXGene:AssetSplitter:DataLoading:%s - %s", self.file):
matrix = anndata.read_h5ad(self.file)
Expand All @@ -72,6 +115,17 @@ def split(self, splits: t.Iterable[StrList], out_dir: Path):
]

def __split_single(self, matrix: AnnData, columns: StrList, values: StrList, out_dir: Path):
"""Perform a single split.
Args:
matrix (AnnData): In memory h5ad data
columns (StrList): Columns to match split values on
values (StrList): Split values to filter rows on
out_dir (Path): Output directory
Returns:
t.Optional[Path]: The path to the resulting h5ad file or None if the subset is empty
"""
with _log_event(f"CellXGene:AssetSplitter:Splitting:{':'.join(values)}:%s"):
file_name = "-".join(values) + ".h5ad"
out_path = out_dir / self.id / file_name
Expand All @@ -92,12 +146,32 @@ def __split_single(self, matrix: AnnData, columns: StrList, values: StrList, out
raise

def get_columns(self, matrix: AnnData):
"""Find columns to use during splitting from a set of candidates
Args:
matrix (AnnData): In memory h5ad data
Returns:
t.List[str]: The matching columns
"""
return [
self.find_column(matrix, candidates)
for candidates in self.column_candidates
]

def find_column(self, matrix: AnnData, candidates: StrList):
"""Find a column which exists within the data from a list of candidate columns.
Args:
matrix (AnnData): In memory h5ad data
candidates (StrList): Candidate column names
Raises:
ValueError: If none of the candidates exists in the data object.
Returns:
str: The first matching column
"""
for column in candidates:
if column in matrix.obs.columns:
return column
Expand All @@ -106,6 +180,16 @@ def find_column(self, matrix: AnnData, candidates: StrList):
raise ValueError(msg)

def create_mask(self, matrix: AnnData, columns: StrList, values: StrList):
"""Create a mask which selects the rows for whose column values matches the provided values.
Args:
matrix (AnnData): In memory h5ad data
columns (StrList): Columns to match on
values (StrList): Values to match agains
Returns:
Any: A mask for subsetting the data matrix
"""
result = None
for column, value in zip(columns, values):
mask = matrix.obs[column] == value
Expand All @@ -114,13 +198,24 @@ def create_mask(self, matrix: AnnData, columns: StrList, values: StrList):


class AssetCombiner:
"""Read and combine multiple smaller h5ad into a single h5ad.
Attributes:
id (str): Dataset id
out_file (Path): Path to write combined h5ad to
sources (StrList): Ids of each asset from which the smaller h5ad was extracted
files (t.List[t.Optional[Path]]): Path to each small h5ad or None when it should be skipped
"""

def __init__(self, id: str, out_file: Path, sources: StrList, files: t.List[t.Optional[Path]]):
self.id = id
self.out_file = out_file
self.sources = sources
self.files = files

def combine(self):
"""Combine split asset pieces into a single h5ad.
"""
with _log_event("CellXGene:AssetCombiner:%s - %s", self.id):
if not _FORCE and self.out_file.exists():
return
Expand All @@ -145,6 +240,14 @@ def combine(self):


def main(args: argparse.Namespace):
"""Splits and recombines assets into a h5ad for each dataset.
The assets and datasets are read from an info json file.
The info file must be in the format of ExtractInfo.
Args:
args (argparse.Namespace): CLI arguments, must contain "info" and "tmp_dir"
"""
with open(args.info) as info_file:
info: ExtractInfo = json.load(info_file)

Expand Down
6 changes: 6 additions & 0 deletions src/cellxgene/extract_donor_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@


def main(args: argparse.Namespace):
"""Print information from a h5ad file.
Printed values includes "sex", "age", and "ethnicity".
Args:
args (argparse.Namespace): CLI arguments, must contain "file"
"""
data = anndata.read_h5ad(args.file)
print("sex:", data.obs[SEX_COLUMN][0])
print("age:", data.obs[AGE_COLUMN][0])
Expand Down
Loading

0 comments on commit 1ec2474

Please sign in to comment.