Skip to content

Commit

Permalink
feat(ingest): improve memory usage of format_ncbi_metadata by process…
Browse files Browse the repository at this point in the history
…ing and writing data line by line (#3421)

* improve memory consumption: write output after formatting each line to not store file in memory, do not use pandas

* Fix escape character issues: define escape characters in where output is later used
  • Loading branch information
anna-parker authored Dec 11, 2024
1 parent bc64942 commit 9323a18
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
5 changes: 3 additions & 2 deletions ingest/scripts/filter_out_depositions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import csv
import logging
from dataclasses import dataclass

Expand Down Expand Up @@ -69,7 +70,7 @@ def filter_out_depositions(
relevant_config = {key: full_config.get(key, []) for key in Config.__annotations__}
config = Config(**relevant_config)
logger.info(f"Config: {config}")
df = pd.read_csv(input_metadata_tsv, sep="\t", dtype=str, keep_default_na=False)
df = pd.read_csv(input_metadata_tsv, sep="\t", dtype=str, keep_default_na=False, quoting=csv.QUOTE_NONE, escapechar="\\")
original_count = len(df)
with open(exclude_insdc_accessions, encoding="utf-8") as f:
loculus_insdc_accessions: set = {line.strip().split(".")[0] for line in f} # Remove version
Expand All @@ -82,7 +83,7 @@ def filter_out_depositions(
] # Filter out all versions of an accession
filtered_df = filtered_df[~filtered_df["biosampleAccession"].isin(loculus_biosample_accessions)]
logger.info(f"Filtered out {(original_count - len(filtered_df))} sequences.")
filtered_df.to_csv(output_metadata_tsv, sep="\t", index=False)
filtered_df.to_csv(output_metadata_tsv, sep="\t", index=False, quoting=csv.QUOTE_NONE, escapechar="\\")


if __name__ == "__main__":
Expand Down
34 changes: 22 additions & 12 deletions ingest/scripts/format_ncbi_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass

import click
import pandas as pd
import yaml

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -135,26 +134,37 @@ def extract_fields(row, ncbi_mappings: NCBIMappings) -> dict:


def jsonl_to_tsv(jsonl_file: str, tsv_file: str, ncbi_mappings: NCBIMappings) -> None:
extracted_rows: list[dict[str, str]] = []
headers = (
list(ncbi_mappings.string_to_string_mappings.keys())
+ list(ncbi_mappings.string_to_list_mappings.keys())
+ [key for val in ncbi_mappings.string_to_dict_mappings.values() for key in val]
+ list(ncbi_mappings.unknown_mappings)
)
with (
open(jsonl_file, encoding="utf-8") as infile,
open(tsv_file, "w", newline="", encoding="utf-8") as file,
):
writer = csv.DictWriter(
file,
fieldnames=headers,
delimiter="\t",
quoting=csv.QUOTE_NONE,
escapechar="\\",
)
writer.writeheader()
for line in infile:
row = json.loads(line.strip())
extracted = extract_fields(row, ncbi_mappings)
extracted["ncbiSubmitterNames"] = reformat_authors_from_genbank_to_loculus(
extracted["ncbiSubmitterNames"], extracted["genbankAccession"]
)
extracted_rows.append(extracted)
df = pd.DataFrame(extracted_rows)
df.to_csv(
tsv_file,
sep="\t",
quoting=csv.QUOTE_NONE,
escapechar="\\",
index=False,
float_format="%.0f",
)

# Ensure float formatting matches "%.0f"
formatted_row = {
key: f"{value:.0f}" if isinstance(value, float) else value
for key, value in extracted.items()
}
writer.writerow(formatted_row)


@click.command()
Expand Down
3 changes: 2 additions & 1 deletion ingest/scripts/prepare_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Add transformations that can be applied to certain fields
# Like separation of country into country and division

import csv
import hashlib
import json
import logging
Expand Down Expand Up @@ -62,7 +63,7 @@ def main(
logger.debug(config)

logger.info(f"Reading metadata from {input}")
df = pd.read_csv(input, sep="\t", dtype=str, keep_default_na=False)
df = pd.read_csv(input, sep="\t", dtype=str, keep_default_na=False, quoting=csv.QUOTE_NONE, escapechar="\\")
metadata: list[dict[str, str]] = df.to_dict(orient="records")

sequence_hashes: dict[str, str] = {
Expand Down

0 comments on commit 9323a18

Please sign in to comment.