From f0eb146d7050d18fbdbe0c8196858d03a219d058 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Sun, 21 Apr 2024 11:28:17 +0200 Subject: [PATCH 01/10] Adding catvar_combiner script --- misc/catvar_combiner.py | 70 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 misc/catvar_combiner.py diff --git a/misc/catvar_combiner.py b/misc/catvar_combiner.py new file mode 100644 index 0000000..ac4e8a4 --- /dev/null +++ b/misc/catvar_combiner.py @@ -0,0 +1,70 @@ +import enum +import os +import pathlib +import gzip +import csv +import json +import sys +import time + +# increase csv field size limit +csv.field_size_limit(sys.maxsize) + +directory = "buckets/clinvar-gk-pilot/2024-04-07/dev/catvar_output_v2/" +file_names = os.listdir(directory) # without directory path +# print(file_names) + +# file_names = file_names[:1] + +output_keys_count = 0 +last_logged_output_count_time = time.time() +last_logged_output_count_value = 0 + +output_file_name = "combined-catvar_output.json" +f0 = pathlib.Path(directory) / file_names[0] +with gzip.open(output_file_name, "wt", compresslevel=9) as f_out: + f_out.write("{\n") + + for file_idx, file_name in enumerate(file_names): + file_path = pathlib.Path(directory) / file_name + print(f"Reading {file_path} ({file_idx + 1}/{len(file_names)})...") + try: + with gzip.open(file_path, "rt") as f_in: + reader = csv.reader(f_in) + is_first_row = True + for i, row in enumerate(reader): + assert ( + len(row) == 1 + ), f"row {i} of file {file_name} had more than 1 column! ({len(row)} columns) {row}" + obj = json.loads(row[0]) + assert ( + len(obj) == 1 + ), f"row {i} of file {file_name} had more than 1 key! ({len(obj)} keys) {obj}" + + # Write key and value + key, value = list(obj.items())[0] + assert isinstance( + key, str + ), f"key {key} on line {i} of file {file_name} is not a string!" + + if not is_first_row: + f_out.write(",\n") + f_out.write(" ") + f_out.write(f'"{key}": ') + f_out.write(json.dumps(value)) + output_keys_count += 1 + is_first_row = False + now = time.time() + if now - last_logged_output_count_time > 5: + new_lines = output_keys_count - last_logged_output_count_value + print( + f"Output keys written: {output_keys_count} ({new_lines/5:.2f} lines/s)" + ) + last_logged_output_count_value = output_keys_count + last_logged_output_count_time = now + except Exception as e: + print(f"Exception while reading {file_name}: {e}") + raise e + f_out.write("}\n") + +print(f"Wrote {output_file_name} successfully!") From 68023219bf1613dd6f79ebff55618c2a8522440d Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Sun, 21 Apr 2024 11:30:17 +0200 Subject: [PATCH 02/10] Remove extra ndjson dep --- clinvar_gk_pilot/main.py | 10 +++++----- pyproject.toml | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 8c8cb61..1dc098f 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -5,7 +5,6 @@ import sys from typing import List -import ndjson from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator @@ -63,11 +62,12 @@ def download_to_local_file(filename: str) -> str: def process_as_json(input_file_name: str, output_file_name: str) -> None: - with gzip.GzipFile(input_file_name, "rb") as input, open( - output_file_name, "wt", encoding="utf-8" - ) as output: + with ( + gzip.GzipFile(input_file_name, "rb") as input, + open(output_file_name, "wt", encoding="utf-8") as output, + ): for line in input: - for clinvar_json in ndjson.loads(line.decode("utf-8")): + for clinvar_json in json.loads(line.decode("utf-8")): if clinvar_json.get("issue") is not None: result = None else: diff --git a/pyproject.toml b/pyproject.toml index 10e6bea..41bbd80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,8 +10,7 @@ classifiers = ["Programming Language :: Python :: 3"] dependencies = [ "google-cloud-storage~=2.13.0", #"ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@2.0.0-a6", - "ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@main-draft", - "ndjson~=0.3.1" + "ga4gh.vrs[extras] @ git+https://github.com/ga4gh/vrs-python@main-draft" ] dynamic = ["version"] From 60efd1fd444f3238f3425a1462c7942f6c44963e Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Sun, 21 Apr 2024 15:17:02 +0200 Subject: [PATCH 03/10] Genericize download_to_local_file and use local path mirror and check if exists when downloading --- clinvar_gk_pilot/gcs.py | 42 ++++++++++++++++++++++ clinvar_gk_pilot/main.py | 75 +++++++++++++++++++++------------------- 2 files changed, 82 insertions(+), 35 deletions(-) diff --git a/clinvar_gk_pilot/gcs.py b/clinvar_gk_pilot/gcs.py index 666d3da..26cc07f 100644 --- a/clinvar_gk_pilot/gcs.py +++ b/clinvar_gk_pilot/gcs.py @@ -2,6 +2,7 @@ import subprocess import threading import time +import os from pathlib import Path, PurePath import requests @@ -29,6 +30,47 @@ def parse_blob_uri(uri: str, client: storage.Client = None) -> storage.Blob: ) +def _local_file_path_for(blob_uri: str, root_dir: str = "buckets") -> str: + parsed_uri = parse_blob_uri(blob_uri) + relpath = f"{root_dir}/{parsed_uri.bucket.name}/{parsed_uri.name}" + return relpath + + +def already_downloaded(blob_uri: str) -> bool: + """ + Returns true if a file at the expected path (using _local_file_path_for) + exists locally and has the same size as the remote blob. + """ + expected_local_file = _local_file_path_for(blob_uri) + blob = parse_blob_uri(blob_uri) + # load the blob metadata from the server + blob.reload() + blob_bytes = blob.size + return ( + Path(expected_local_file).exists() + and Path(expected_local_file).stat().st_size == blob_bytes + ) + + +def download_to_local_file(blob_uri: str) -> str: + """ + Expects a blob_uri beginning with "gs://". + Downloads to a local file using _local_file_path_for to generate the local path. + """ + if not blob_uri.startswith("gs://"): + raise RuntimeError( + "Expecting a google cloud storage URI beginning with 'gs://'." + ) + blob = parse_blob_uri(blob_uri) + local_file_name = _local_file_path_for(blob_uri) + # Make parents + os.makedirs(os.path.dirname(local_file_name), exist_ok=True) + with open(local_file_name, "wb") as f: + logger.info(f"Downloading {blob_uri} to {local_file_name}") + blob.download_to_file(file_obj=f) + return local_file_name + + def copy_file_to_bucket( local_file_uri: str, remote_blob_uri: str, client: storage.Client = None ): diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 1dc098f..760e3ac 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -8,7 +8,7 @@ from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator -from clinvar_gk_pilot.gcs import parse_blob_uri +from clinvar_gk_pilot.gcs import parse_blob_uri, download_to_local_file from clinvar_gk_pilot.logger import logger # TODO - implement as separate strategy class for using vrs_python @@ -42,44 +42,46 @@ def parse_args(args: List[str]) -> dict: return vars(parser.parse_args(args)) -def download_to_local_file(filename: str) -> str: - """ - Expects a filename beginning with "gs://" and ending with ".json.gz". - Downloads and decompresses into string form. - # TODO - this likely will not work for large ClinVar release files - """ - if not filename.startswith("gs://"): - raise RuntimeError( - "Expecting a google cloud storage URI beginning with 'gs://'." - ) - if not filename.endswith(".json.gz"): - raise RuntimeError("Expecting a compressed filename ending with '.json.gz'.") - blob = parse_blob_uri(filename) - local_file_name = filename.split("/")[-1] - with open(local_file_name, "wb") as f: - blob.download_to_file(file_obj=f) - return local_file_name +# def download_to_local_file(filename: str) -> str: +# """ +# Expects a filename beginning with "gs://" and ending with ".json.gz". +# Downloads and decompresses into string form. +# # TODO - this likely will not work for large ClinVar release files +# """ +# if not filename.startswith("gs://"): +# raise RuntimeError( +# "Expecting a google cloud storage URI beginning with 'gs://'." +# ) +# if not filename.endswith(".json.gz"): +# raise RuntimeError("Expecting a compressed filename ending with '.json.gz'.") +# blob = parse_blob_uri(filename) + +# local_file_name = filename.split("/")[-1] +# with open(local_file_name, "wb") as f: +# blob.download_to_file(file_obj=f) +# return local_file_name def process_as_json(input_file_name: str, output_file_name: str) -> None: with ( gzip.GzipFile(input_file_name, "rb") as input, - open(output_file_name, "wt", encoding="utf-8") as output, + gzip.GzipFile(output_file_name, "wb") as output, ): for line in input: - for clinvar_json in json.loads(line.decode("utf-8")): - if clinvar_json.get("issue") is not None: - result = None - else: - cls = clinvar_json["vrs_class"] - if cls == "Allele": - result = allele(clinvar_json) - elif cls == "CopyNumberChange": - result = copy_number_change(clinvar_json) - elif cls == "CopyNumberCount": - result = copy_number_count(clinvar_json) - content = {"in": clinvar_json, "out": result} - output.write(str(json.dumps(content) + "\n")) + clinvar_json = json.loads(line.decode("utf-8")) + if clinvar_json.get("issue") is not None: + result = None + else: + cls = clinvar_json["vrs_class"] + if cls == "Allele": + result = allele(clinvar_json) + elif cls == "CopyNumberChange": + result = copy_number_change(clinvar_json) + elif cls == "CopyNumberCount": + result = copy_number_count(clinvar_json) + content = {"in": clinvar_json, "out": result} + output.write(json.dumps(content).encode("utf-8")) + output.write("\n".encode("utf-8")) def allele(clinvar_json: dict) -> dict: @@ -122,16 +124,19 @@ def copy_number_change(clinvar_json: dict) -> dict: return {"errors": str(e)} -def main(argv=sys.argv): +def main(argv=sys.argv[1:]): """ Process the --filename argument (expected as 'gs://..../filename.json.gz') and returns contents in file 'output-filename.ndjson' """ filename = parse_args(argv)["filename"] local_file_name = download_to_local_file(filename) - outfile = str("output-" + local_file_name.replace(".json.gz", "") + ".ndjson") + outfile = str("output-" + local_file_name) process_as_json(local_file_name, outfile) if __name__ == "__main__": - main(["--filename", "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz"]) + if len(sys.argv) == 1: + main(["--filename", "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz"]) + else: + main(sys.argv[1:]) From 49f9c5f0088066bbf25b0a4bf1f625b35c04f1cb Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Sun, 21 Apr 2024 15:53:32 +0200 Subject: [PATCH 04/10] Change output file location to a separate outputs directory --- clinvar_gk_pilot/gcs.py | 2 +- clinvar_gk_pilot/main.py | 28 ++++++---------------------- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/clinvar_gk_pilot/gcs.py b/clinvar_gk_pilot/gcs.py index 26cc07f..727a92c 100644 --- a/clinvar_gk_pilot/gcs.py +++ b/clinvar_gk_pilot/gcs.py @@ -1,8 +1,8 @@ +import os import queue import subprocess import threading import time -import os from pathlib import Path, PurePath import requests diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 760e3ac..bdb5a61 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -2,13 +2,14 @@ import gzip import json import os +import pathlib import sys from typing import List from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator -from clinvar_gk_pilot.gcs import parse_blob_uri, download_to_local_file +from clinvar_gk_pilot.gcs import download_to_local_file from clinvar_gk_pilot.logger import logger # TODO - implement as separate strategy class for using vrs_python @@ -42,26 +43,6 @@ def parse_args(args: List[str]) -> dict: return vars(parser.parse_args(args)) -# def download_to_local_file(filename: str) -> str: -# """ -# Expects a filename beginning with "gs://" and ending with ".json.gz". -# Downloads and decompresses into string form. -# # TODO - this likely will not work for large ClinVar release files -# """ -# if not filename.startswith("gs://"): -# raise RuntimeError( -# "Expecting a google cloud storage URI beginning with 'gs://'." -# ) -# if not filename.endswith(".json.gz"): -# raise RuntimeError("Expecting a compressed filename ending with '.json.gz'.") -# blob = parse_blob_uri(filename) - -# local_file_name = filename.split("/")[-1] -# with open(local_file_name, "wb") as f: -# blob.download_to_file(file_obj=f) -# return local_file_name - - def process_as_json(input_file_name: str, output_file_name: str) -> None: with ( gzip.GzipFile(input_file_name, "rb") as input, @@ -131,7 +112,10 @@ def main(argv=sys.argv[1:]): """ filename = parse_args(argv)["filename"] local_file_name = download_to_local_file(filename) - outfile = str("output-" + local_file_name) + + outfile = str(pathlib.Path("output") / local_file_name) + # Make parents + os.makedirs(os.path.dirname(outfile), exist_ok=True) process_as_json(local_file_name, outfile) From 663b36dc4cd5f69c53221893218940e20dc82bd3 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 11:33:40 -0400 Subject: [PATCH 05/10] Add partitioner step and multiprocessing call on each partitioned input file. Add task.py although not used --- clinvar_gk_pilot/main.py | 149 ++++++++++++++++++++++++++++++++------- clinvar_gk_pilot/task.py | 90 +++++++++++++++++++++++ 2 files changed, 212 insertions(+), 27 deletions(-) create mode 100644 clinvar_gk_pilot/task.py diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index bdb5a61..5361862 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -1,17 +1,26 @@ import argparse +import contextlib import gzip +import itertools import json +import multiprocessing import os import pathlib import sys + from typing import List from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator -from clinvar_gk_pilot.gcs import download_to_local_file +from clinvar_gk_pilot.gcs import ( + _local_file_path_for, + already_downloaded, + download_to_local_file, +) from clinvar_gk_pilot.logger import logger + # TODO - implement as separate strategy class for using vrs_python # vs. another for anyvar vs. another for variation_normalizer # Encapsulate translators and data_proxy in strategy class @@ -40,44 +49,101 @@ def parse_args(args: List[str]) -> dict: """ parser = argparse.ArgumentParser() parser.add_argument("--filename", required=True, help="Filename to read") + parser.add_argument( + "--parallelism", type=int, default=1, help="Number of worker threads" + ) return vars(parser.parse_args(args)) -def process_as_json(input_file_name: str, output_file_name: str) -> None: +def process_line(line: str) -> str: + """ + Takes a line of JSON, processes it, and returns the result as a JSON string. + """ + clinvar_json = json.loads(line) + result = None + if clinvar_json.get("issue") is None: + cls = clinvar_json["vrs_class"] + if cls == "Allele": + result = allele(clinvar_json) + elif cls == "CopyNumberChange": + result = copy_number_change(clinvar_json) + elif cls == "CopyNumberCount": + result = copy_number_count(clinvar_json) + content = {"in": clinvar_json, "out": result} + return json.dumps(content) + + +def partition_into_blocks(iterable, block_size=5): + iterator = iter(iterable) + while True: + block = list(itertools.islice(iterator, block_size)) + if not block: + break + yield block + + +def worker(file_name_gz: str, output_file_name: str) -> None: + """ + Takes an input file (a GZIP file of newline delimited), runs `process_line` + on each line, and writes the output to a new GZIP file called `output_file_name`. + """ with ( - gzip.GzipFile(input_file_name, "rb") as input, - gzip.GzipFile(output_file_name, "wb") as output, + gzip.open(file_name_gz, "rt", encoding="utf-8") as input_file, + gzip.open(output_file_name, "wt", encoding="utf-8") as output_file, ): - for line in input: - clinvar_json = json.loads(line.decode("utf-8")) - if clinvar_json.get("issue") is not None: - result = None - else: - cls = clinvar_json["vrs_class"] - if cls == "Allele": - result = allele(clinvar_json) - elif cls == "CopyNumberChange": - result = copy_number_change(clinvar_json) - elif cls == "CopyNumberCount": - result = copy_number_count(clinvar_json) - content = {"in": clinvar_json, "out": result} - output.write(json.dumps(content).encode("utf-8")) - output.write("\n".encode("utf-8")) + for line in input_file: + output_file.write(process_line(line)) + output_file.write("\n") + + +def process_as_json_single_thread(input_file_name: str, output_file_name: str) -> None: + worker(input_file_name, output_file_name) + print(f"Output written to {output_file_name}") + + +def process_as_json(input_file_name: str, output_file_name: str) -> None: + part_input_file_names = partition_file_lines_gz(input_file_name, 8) + + part_output_file_names = [f"{ofn}.out" for ofn in part_input_file_names] + + workers = [] + # Start a worker per file name + for part_ifn, part_ofn in zip(part_input_file_names, part_output_file_names): + w = multiprocessing.Process(target=worker, args=(part_ifn, part_ofn)) + w.start() + workers.append(w) + + # Wait for all workers to finish + for w in workers: + w.join() + + with gzip.open(output_file_name, "wt", encoding="utf-8") as f_out: + for part_ofn in part_output_file_names: + print(f"Writing output from {part_ofn} to {output_file_name}") + line_counter = 0 + with gzip.open(part_ofn, "rt", encoding="utf-8") as f_in: + for line in f_in: + f_out.write(line) + f_out.write("\n") + line_counter += 1 + print(f"Lines written: {line_counter}") + + print(f"Output written to {output_file_name}") def allele(clinvar_json: dict) -> dict: try: - tlr = allele_translators.get(clinvar_json.get("assembly_version", "38")) + tlr = allele_translators[clinvar_json.get("assembly_version", "38")] vrs = tlr.translate_from(var=clinvar_json["source"], fmt=clinvar_json["fmt"]) return vrs.model_dump(exclude_none=True) except Exception as e: - logger.error(f"Exception raised in 'allele' processing: {clinvar_json}") + logger.error(f"Exception raised in 'allele' processing: {clinvar_json}: {e}") return {"errors": str(e)} def copy_number_count(clinvar_json: dict) -> dict: try: - tlr = cnv_translators.get(clinvar_json.get("assembly_version", "38")) + tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] kwargs = {"copies": clinvar_json["absolute_copies"]} vrs = tlr.translate_from( var=clinvar_json["source"], fmt=clinvar_json["fmt"], **kwargs @@ -85,14 +151,14 @@ def copy_number_count(clinvar_json: dict) -> dict: return vrs.model_dump(exclude_none=True) except Exception as e: logger.error( - f"Exception raised in 'copy_number_count' processing: {clinvar_json}" + f"Exception raised in 'copy_number_count' processing: {clinvar_json}: {e}" ) return {"errors": str(e)} def copy_number_change(clinvar_json: dict) -> dict: try: - tlr = cnv_translators.get(clinvar_json.get("assembly_version", "38")) + tlr = cnv_translators[clinvar_json.get("assembly_version", "38")] kwargs = {"copy_change": clinvar_json["copy_change_type"]} vrs = tlr.translate_from( var=clinvar_json["source"], fmt=clinvar_json["fmt"], **kwargs @@ -100,18 +166,47 @@ def copy_number_change(clinvar_json: dict) -> dict: return vrs.model_dump(exclude_none=True) except Exception as e: logger.error( - f"Exception raised in 'copy_number_change' processing: {clinvar_json}" + f"Exception raised in 'copy_number_change' processing: {clinvar_json}: {e}" ) return {"errors": str(e)} +def partition_file_lines_gz(local_file_path_gz: str, partitions: int) -> List[str]: + """ + Split `local_file_path_gz` into `partitions` roughly equal parts by line count. + + Return a list of `partitions` file names that are a roughly equal + number of lines from `local_file_path_gz`. + """ + filenames = [f"{local_file_path_gz}.part_{i+1}" for i in range(partitions)] + + # Read the file and write each line to a file, looping through the output files + with gzip.open(local_file_path_gz, "rt") as f: + # Open output files + with contextlib.ExitStack() as stack: + files = [ + stack.enter_context(gzip.open(filename, "wt", encoding="utf-8")) + for filename in filenames + ] + for i, line in enumerate(f): + file_idx = i % partitions + files[file_idx].write(line) + + return filenames + + def main(argv=sys.argv[1:]): """ Process the --filename argument (expected as 'gs://..../filename.json.gz') and returns contents in file 'output-filename.ndjson' """ - filename = parse_args(argv)["filename"] - local_file_name = download_to_local_file(filename) + opts = parse_args(argv) + filename = opts["filename"] + assert filename.startswith("gs://") + if not already_downloaded(filename): + local_file_name = download_to_local_file(filename) + else: + local_file_name = _local_file_path_for(filename) outfile = str(pathlib.Path("output") / local_file_name) # Make parents diff --git a/clinvar_gk_pilot/task.py b/clinvar_gk_pilot/task.py new file mode 100644 index 0000000..c37a6ed --- /dev/null +++ b/clinvar_gk_pilot/task.py @@ -0,0 +1,90 @@ +import multiprocessing +import time + + +def _worker(task_start_times, task_queue, outputs_queue): + """ + Worker process target that continuously listens for tasks. + Defined at root of module so it can be pickled. + """ + while True: + task = task_queue.get() + if task is None: # None is a signal to shut down + break + task_id, func, args, kwargs = task + try: + # Store the start time as a timestamp to avoid serialization issues + task_start_times[task_id] = time.time() + val = func(*args, **kwargs) + outputs_queue.put((task_id, val)) + finally: + del task_start_times[task_id] + + +class MPTaskQueue: + def __init__(self, num_workers, output_queue): + self.num_workers = num_workers + self.output_queue = output_queue + self.tasks = multiprocessing.Queue(100) + self.processes = [] + self.task_start_times = multiprocessing.Manager().dict() + + def add_task(self, func, *args, **kwargs): + """ + Add a task to the queue with a unique task ID. + """ + task_id = f"task-{time.time()}" # Unique ID based on timestamp + self.tasks.put((task_id, func, args, kwargs)) + return task_id + + def start(self): + """ + Start the worker processes. + """ + for _ in range(self.num_workers): + p = multiprocessing.Process( + target=_worker, + args=(self.task_start_times, self.tasks, self.output_queue), + ) + p.start() + self.processes.append(p) + + def stop(self): + """Stop all worker processes.""" + for _ in range(self.num_workers): + self.tasks.put(None) + for p in self.processes: + p.join() + + def monitor_tasks(self): + """Monitor and report the running time of each task.""" + for task_id, start_time in list(self.task_start_times.items()): + elapsed_time = time.time() - start_time + print(f"Task {task_id} has been running for {elapsed_time:.2f} seconds.") + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + return False # Allows exceptions to propagate + + +# Example of how to use this TaskQueue as a process pool +def example_task(duration, message): + print(f"Starting task: {message}") + time.sleep(duration) + print(f"Finished task: {message}") + + +if __name__ == "__main__": + pass + # with MPTaskQueue(num_workers=4) as pool: + # task_ids = [ + # pool.add_task(example_task, 2, "Process data"), + # pool.add_task(example_task, 3, "Load data"), + # pool.add_task(example_task, 1, "Send notification"), + # ] + # time.sleep(1) # Delay to allow tasks to start + # pool.monitor_tasks() # Monitoring task durations From 07a72a9c6616b4f661501e9f5d3060d863ae1a39 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 12:35:44 -0400 Subject: [PATCH 06/10] Add start_task_with_timeout, but it's too slow to create new processes --- clinvar_gk_pilot/main.py | 78 +++++++++++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 16 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 5361862..1e3ee6b 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -7,8 +7,7 @@ import os import pathlib import sys - -from typing import List +from typing import Any, Callable, Iterable, List from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator @@ -20,7 +19,6 @@ ) from clinvar_gk_pilot.logger import logger - # TODO - implement as separate strategy class for using vrs_python # vs. another for anyvar vs. another for variation_normalizer # Encapsulate translators and data_proxy in strategy class @@ -73,13 +71,38 @@ def process_line(line: str) -> str: return json.dumps(content) -def partition_into_blocks(iterable, block_size=5): - iterator = iter(iterable) - while True: - block = list(itertools.islice(iterator, block_size)) - if not block: - break - yield block +def _task_with_return_queue( + task: Callable, args: Iterable[Any], return_queue: multiprocessing.Queue +): + """ + Executes task and returns the result via a queue. + """ + return_queue.put(task(*args)) + + +def start_task_with_timeout( + task: Callable, + args: Iterable[Any], + return_queue: multiprocessing.Queue, + timeout_s=60, +) -> str: + """ + Runs task in a separate process and waits for it to complete or times out. + """ + # return_queue = multiprocessing.Queue() + process = multiprocessing.Process( + target=_task_with_return_queue, args=(task, args, return_queue) + ) + process.start() + process.join(timeout_s) + if process.is_alive(): + print("Task did not complete in time, terminating it.") + process.terminate() # Forcefully terminate the process + process.join() # Wait for process resources to be released + return json.dumps({"errors": f"Task did not complete in {timeout_s} seconds."}) + + # Get the return value + return return_queue.get() def worker(file_name_gz: str, output_file_name: str) -> None: @@ -91,8 +114,13 @@ def worker(file_name_gz: str, output_file_name: str) -> None: gzip.open(file_name_gz, "rt", encoding="utf-8") as input_file, gzip.open(output_file_name, "wt", encoding="utf-8") as output_file, ): + return_queue = multiprocessing.Queue() for line in input_file: - output_file.write(process_line(line)) + out_line = start_task_with_timeout( + process_line, (line,), return_queue, timeout_s=60 + ) + # output_file.write(process_line(line)) + output_file.write(out_line) output_file.write("\n") @@ -202,11 +230,13 @@ def main(argv=sys.argv[1:]): """ opts = parse_args(argv) filename = opts["filename"] - assert filename.startswith("gs://") - if not already_downloaded(filename): - local_file_name = download_to_local_file(filename) + if filename.startswith("gs://"): + if not already_downloaded(filename): + local_file_name = download_to_local_file(filename) + else: + local_file_name = _local_file_path_for(filename) else: - local_file_name = _local_file_path_for(filename) + local_file_name = filename outfile = str(pathlib.Path("output") / local_file_name) # Make parents @@ -216,6 +246,22 @@ def main(argv=sys.argv[1:]): if __name__ == "__main__": if len(sys.argv) == 1: - main(["--filename", "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz"]) + # main( + # [ + # "--filename", + # "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz", + # "--parallelism", + # "8", + # ] + # ) + + main( + [ + "--filename", + "vi-100000.json.gz", + "--parallelism", + "8", + ] + ) else: main(sys.argv[1:]) From 8bacf77833678cf8430a47437b6ecc02606744ab Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 13:56:38 -0400 Subject: [PATCH 07/10] Add background process with timeout to worker process. Terminate background processes cleanly --- clinvar_gk_pilot/main.py | 136 ++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 53 deletions(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 1e3ee6b..0db8d61 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -1,13 +1,14 @@ import argparse import contextlib import gzip -import itertools import json import multiprocessing import os import pathlib +import queue import sys -from typing import Any, Callable, Iterable, List +from functools import partial +from typing import List from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator @@ -48,7 +49,14 @@ def parse_args(args: List[str]) -> dict: parser = argparse.ArgumentParser() parser.add_argument("--filename", required=True, help="Filename to read") parser.add_argument( - "--parallelism", type=int, default=1, help="Number of worker threads" + "--parallelism", + type=int, + default=1, + help=( + "Number of worker threads. " + "Default 1, which still uses a separate process to run tasks. " + "Set to 0 to run in main thread." + ), ) return vars(parser.parse_args(args)) @@ -71,38 +79,17 @@ def process_line(line: str) -> str: return json.dumps(content) -def _task_with_return_queue( - task: Callable, args: Iterable[Any], return_queue: multiprocessing.Queue +def _task_worker( + task_queue: multiprocessing.Queue, return_queue: multiprocessing.Queue ): """ - Executes task and returns the result via a queue. + Worker function that processes tasks from a queue. """ - return_queue.put(task(*args)) - - -def start_task_with_timeout( - task: Callable, - args: Iterable[Any], - return_queue: multiprocessing.Queue, - timeout_s=60, -) -> str: - """ - Runs task in a separate process and waits for it to complete or times out. - """ - # return_queue = multiprocessing.Queue() - process = multiprocessing.Process( - target=_task_with_return_queue, args=(task, args, return_queue) - ) - process.start() - process.join(timeout_s) - if process.is_alive(): - print("Task did not complete in time, terminating it.") - process.terminate() # Forcefully terminate the process - process.join() # Wait for process resources to be released - return json.dumps({"errors": f"Task did not complete in {timeout_s} seconds."}) - - # Get the return value - return return_queue.get() + while True: + task = task_queue.get() + if task is None: + break + return_queue.put(task()) def worker(file_name_gz: str, output_file_name: str) -> None: @@ -114,23 +101,61 @@ def worker(file_name_gz: str, output_file_name: str) -> None: gzip.open(file_name_gz, "rt", encoding="utf-8") as input_file, gzip.open(output_file_name, "wt", encoding="utf-8") as output_file, ): + task_queue = multiprocessing.Queue() return_queue = multiprocessing.Queue() - for line in input_file: - out_line = start_task_with_timeout( - process_line, (line,), return_queue, timeout_s=60 + task_timeout = 10 + + def make_background_process(): + p = multiprocessing.Process( + target=_task_worker, + args=(task_queue, return_queue), ) - # output_file.write(process_line(line)) - output_file.write(out_line) + return p + + background_process = make_background_process() + background_process.start() + + for line in input_file: + task_queue.put(partial(process_line, line)) + try: + ret = return_queue.get(timeout=task_timeout) + except queue.Empty: + print("Task did not complete in time, terminating it.") + background_process.terminate() + background_process.join() + ret = json.dumps( + { + "errors": f"Task did not complete in {task_timeout} seconds.", + "line": line, + } + ) + print("Restarting background process") + background_process = make_background_process() + background_process.start() + output_file.write(ret) output_file.write("\n") + task_queue.put(None) + background_process.join() + def process_as_json_single_thread(input_file_name: str, output_file_name: str) -> None: - worker(input_file_name, output_file_name) + with gzip.open(input_file_name, "rt", encoding="utf-8") as f_in: + with gzip.open(output_file_name, "wt", encoding="utf-8") as f_out: + for line in f_in: + f_out.write(process_line(line)) + f_out.write("\n") print(f"Output written to {output_file_name}") -def process_as_json(input_file_name: str, output_file_name: str) -> None: - part_input_file_names = partition_file_lines_gz(input_file_name, 8) +def process_as_json( + input_file_name: str, output_file_name: str, parallelism: int +) -> None: + """ + Process `input_file_name` in parallel and write the results to `output_file_name`. + """ + assert parallelism > 0, "Parallelism must be greater than 0" + part_input_file_names = partition_file_lines_gz(input_file_name, parallelism) part_output_file_names = [f"{ofn}.out" for ofn in part_input_file_names] @@ -152,7 +177,8 @@ def process_as_json(input_file_name: str, output_file_name: str) -> None: with gzip.open(part_ofn, "rt", encoding="utf-8") as f_in: for line in f_in: f_out.write(line) - f_out.write("\n") + if not line.endswith("\n"): + f_out.write("\n") line_counter += 1 print(f"Lines written: {line_counter}") @@ -241,27 +267,31 @@ def main(argv=sys.argv[1:]): outfile = str(pathlib.Path("output") / local_file_name) # Make parents os.makedirs(os.path.dirname(outfile), exist_ok=True) - process_as_json(local_file_name, outfile) + + if opts["parallelism"] == 0: + process_as_json_single_thread(local_file_name, outfile) + else: + process_as_json(local_file_name, outfile, opts["parallelism"]) if __name__ == "__main__": if len(sys.argv) == 1: - # main( - # [ - # "--filename", - # "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz", - # "--parallelism", - # "8", - # ] - # ) - main( [ "--filename", - "vi-100000.json.gz", + "gs://clinvar-gk-pilot/2024-04-07/dev/vi.json.gz", "--parallelism", - "8", + "10", ] ) + + # main( + # [ + # "--filename", + # "vi-100000.json.gz", + # "--parallelism", + # "1", + # ] + # ) else: main(sys.argv[1:]) From 42ce34ce19ff42b0ec1ac842622eee6808ea3de5 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 13:57:11 -0400 Subject: [PATCH 08/10] Remove task.py --- clinvar_gk_pilot/task.py | 90 ---------------------------------------- 1 file changed, 90 deletions(-) delete mode 100644 clinvar_gk_pilot/task.py diff --git a/clinvar_gk_pilot/task.py b/clinvar_gk_pilot/task.py deleted file mode 100644 index c37a6ed..0000000 --- a/clinvar_gk_pilot/task.py +++ /dev/null @@ -1,90 +0,0 @@ -import multiprocessing -import time - - -def _worker(task_start_times, task_queue, outputs_queue): - """ - Worker process target that continuously listens for tasks. - Defined at root of module so it can be pickled. - """ - while True: - task = task_queue.get() - if task is None: # None is a signal to shut down - break - task_id, func, args, kwargs = task - try: - # Store the start time as a timestamp to avoid serialization issues - task_start_times[task_id] = time.time() - val = func(*args, **kwargs) - outputs_queue.put((task_id, val)) - finally: - del task_start_times[task_id] - - -class MPTaskQueue: - def __init__(self, num_workers, output_queue): - self.num_workers = num_workers - self.output_queue = output_queue - self.tasks = multiprocessing.Queue(100) - self.processes = [] - self.task_start_times = multiprocessing.Manager().dict() - - def add_task(self, func, *args, **kwargs): - """ - Add a task to the queue with a unique task ID. - """ - task_id = f"task-{time.time()}" # Unique ID based on timestamp - self.tasks.put((task_id, func, args, kwargs)) - return task_id - - def start(self): - """ - Start the worker processes. - """ - for _ in range(self.num_workers): - p = multiprocessing.Process( - target=_worker, - args=(self.task_start_times, self.tasks, self.output_queue), - ) - p.start() - self.processes.append(p) - - def stop(self): - """Stop all worker processes.""" - for _ in range(self.num_workers): - self.tasks.put(None) - for p in self.processes: - p.join() - - def monitor_tasks(self): - """Monitor and report the running time of each task.""" - for task_id, start_time in list(self.task_start_times.items()): - elapsed_time = time.time() - start_time - print(f"Task {task_id} has been running for {elapsed_time:.2f} seconds.") - - def __enter__(self): - self.start() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.stop() - return False # Allows exceptions to propagate - - -# Example of how to use this TaskQueue as a process pool -def example_task(duration, message): - print(f"Starting task: {message}") - time.sleep(duration) - print(f"Finished task: {message}") - - -if __name__ == "__main__": - pass - # with MPTaskQueue(num_workers=4) as pool: - # task_ids = [ - # pool.add_task(example_task, 2, "Process data"), - # pool.add_task(example_task, 3, "Load data"), - # pool.add_task(example_task, 1, "Send notification"), - # ] - # time.sleep(1) # Delay to allow tasks to start - # pool.monitor_tasks() # Monitoring task durations From cead7829e6bc7aa420dc2ac3596f11cd2bc75611 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 14:16:35 -0400 Subject: [PATCH 09/10] Fix test by extracting arg parsing to separate cli module that doesn't need to access seqrepo --- clinvar_gk_pilot/cli.py | 21 +++++++++++++++++++++ clinvar_gk_pilot/main.py | 20 +------------------- test/{test_main.py => test_cli.py} | 5 +++-- 3 files changed, 25 insertions(+), 21 deletions(-) create mode 100644 clinvar_gk_pilot/cli.py rename test/{test_main.py => test_cli.py} (55%) diff --git a/clinvar_gk_pilot/cli.py b/clinvar_gk_pilot/cli.py new file mode 100644 index 0000000..ee9e6b1 --- /dev/null +++ b/clinvar_gk_pilot/cli.py @@ -0,0 +1,21 @@ +import argparse +from typing import List + + +def parse_args(args: List[str]) -> dict: + """ + Parse arguments and return as dict. + """ + parser = argparse.ArgumentParser() + parser.add_argument("--filename", required=True, help="Filename to read") + parser.add_argument( + "--parallelism", + type=int, + default=1, + help=( + "Number of worker threads. " + "Default 1, which still uses a separate process to run tasks. " + "Set to 0 to run in main thread." + ), + ) + return vars(parser.parse_args(args)) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 0db8d61..620cd85 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -13,6 +13,7 @@ from ga4gh.vrs.dataproxy import create_dataproxy from ga4gh.vrs.extras.translator import AlleleTranslator, CnvTranslator +from clinvar_gk_pilot.cli import parse_args from clinvar_gk_pilot.gcs import ( _local_file_path_for, already_downloaded, @@ -42,25 +43,6 @@ } -def parse_args(args: List[str]) -> dict: - """ - Parse arguments and return as dict. - """ - parser = argparse.ArgumentParser() - parser.add_argument("--filename", required=True, help="Filename to read") - parser.add_argument( - "--parallelism", - type=int, - default=1, - help=( - "Number of worker threads. " - "Default 1, which still uses a separate process to run tasks. " - "Set to 0 to run in main thread." - ), - ) - return vars(parser.parse_args(args)) - - def process_line(line: str) -> str: """ Takes a line of JSON, processes it, and returns the result as a JSON string. diff --git a/test/test_main.py b/test/test_cli.py similarity index 55% rename from test/test_main.py rename to test/test_cli.py index 20d2bb0..8505376 100644 --- a/test/test_main.py +++ b/test/test_cli.py @@ -1,8 +1,9 @@ -from clinvar_gk_pilot.main import parse_args +from clinvar_gk_pilot.cli import parse_args def test_parse_args(): argv = ["--filename", "test.txt"] opts = parse_args(argv) assert opts["filename"] == "test.txt" - assert len(opts) == 1 + assert opts["parallelism"] == 1 + assert len(opts) == 2 From 8fc6880ab1cadecedec966d2233a462093c3ad25 Mon Sep 17 00:00:00 2001 From: Kyle Ferriter <kferrite@broadinstitute.org> Date: Tue, 30 Apr 2024 14:39:57 -0400 Subject: [PATCH 10/10] lint --- clinvar_gk_pilot/main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/clinvar_gk_pilot/main.py b/clinvar_gk_pilot/main.py index 620cd85..5fcfe9f 100644 --- a/clinvar_gk_pilot/main.py +++ b/clinvar_gk_pilot/main.py @@ -1,4 +1,3 @@ -import argparse import contextlib import gzip import json