diff --git a/tasks/eva_2326/src/main/groovy/eva2326/nextflow.nf b/tasks/eva_2326/src/main/groovy/eva2326/nextflow.nf new file mode 100644 index 0000000..5e44cf6 --- /dev/null +++ b/tasks/eva_2326/src/main/groovy/eva2326/nextflow.nf @@ -0,0 +1,85 @@ +nextflow.enable.dsl=2 + +params.input_file = null +params.chunk_size = 1_000_000 +params.working_dir = null +params.env_properties_file = null +params.db_name = null +params.max_parallel_chunks = 5 +params.project_dir = null +params.groovy_script = null + +// ── PROCESS 1: split into named chunks ─────────────────────────────────────── +process SPLIT_INPUT { + output: + path "chunk_*.txt", emit: chunks + + script: + """ + awk -v chunk_size=${params.chunk_size} ' + { + line_num = NR + if ((line_num - 1) % chunk_size == 0) { + start = line_num + end = start + chunk_size - 1 + # zero-pad to 10 digits so filenames sort correctly + filename = sprintf("chunk_%010d-%010d.txt", start, end) + } + print > filename + } + ' ${params.input_file} + """ +} + +// ── PROCESS 2: remediate one chunk ─────────────────────────────────────────── +process REMEDIATE_CHUNK { + label 'long_time' + label 'med_mem' + maxForks params.max_parallel_chunks + + tag { chunk_file.name } + + input: + path chunk_file + + output: + path "${chunk_file.baseName}.done", emit: done_flag + + script: + """ + CHUNK_NAME=\$(basename ${chunk_file} .txt) + NOT_REMEDIATED_FILE=${params.working_dir}/variants_not_remediated/${params.db_name}_\$CHUNK_NAME.txt + mkdir -p \$(dirname "\$NOT_REMEDIATED_FILE") + + bash run_groovy_script.sh \ + ${params.project_dir} \ + ${params.groovy_script} \ + -envPropertiesFile=${params.env_properties_file} \ + -dbName=${params.db_name} \ + -annotationRemediationInputFile=\$(realpath ${chunk_file}) \ + -notRemediatedVariantsFilePath="\$NOT_REMEDIATED_FILE" \ + > ${params.working_dir}/\$CHUNK_NAME.log 2>&1 + + echo "Done: ${chunk_file.name}" > ${chunk_file.baseName}.done + """ +} + +// ── WORKFLOW ────────────────────────────────────────────────────────────────── +workflow { + if (!params.input_file) error "Please provide --input_file" + if (!params.env_properties_file) error "Please provide --env_properties_file" + if (!params.db_name) error "Please provide --db_name" + if (!params.working_dir) error "Please provide --working_dir" + if (!params.project_dir) error "Please provide --project_dir" + if (!params.groovy_script) error "Please provide --groovy_script" + + SPLIT_INPUT() + + chunks_ch = SPLIT_INPUT.out.chunks.flatten() + + REMEDIATE_CHUNK(chunks_ch) + + REMEDIATE_CHUNK.out.done_flag + .collect() + .view { flags -> "All chunks complete: ${flags.size()} chunks processed" } +} \ No newline at end of file diff --git a/tasks/eva_2326/src/main/groovy/eva2326/remediate_annotations.groovy b/tasks/eva_2326/src/main/groovy/eva2326/remediate_annotations.groovy new file mode 100644 index 0000000..1f9eca6 --- /dev/null +++ b/tasks/eva_2326/src/main/groovy/eva2326/remediate_annotations.groovy @@ -0,0 +1,275 @@ +package eva2326 + +import groovy.cli.picocli.CliBuilder +import org.bson.Document +import org.bson.BsonSerializationException +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.CommandLineRunner +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration +import org.springframework.boot.builder.SpringApplicationBuilder +import org.springframework.data.mongodb.core.MongoTemplate +import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper +import org.springframework.data.mongodb.core.convert.MappingMongoConverter +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument + +import java.util.regex.Pattern +import java.util.stream.Collectors + +import static org.springframework.data.mongodb.core.query.Criteria.where + +def cli = new CliBuilder() +cli.envPropertiesFile(args: 1, "Properties file with db details to use for update", required: true) +cli.dbName(args: 1, "Database name that needs to be updated", required: true) +cli.annotationRemediationInputFile(args: 1, "Path to CSV file with old_variant_id,new_variant_id,insdc_contig for annotation remediation", required: true) +cli.notRemediatedVariantsFilePath(args: 1, "Path to file where the variants which are not remediated will be stored", required: true) +def options = cli.parse(args) +if (!options) { + cli.usage() + System.exit(1) +} + +new SpringApplicationBuilder(RemediateAnnotationsApplication.class).properties([ + 'spring.config.location' : options.envPropertiesFile, + 'spring.data.mongodb.database': options.dbName]) + .run(options.annotationRemediationInputFile, options.notRemediatedVariantsFilePath) + + +@SpringBootApplication(exclude = [DataSourceAutoConfiguration.class]) +class RemediateAnnotationsApplication implements CommandLineRunner { + static Logger logger = LoggerFactory.getLogger(RemediateAnnotationsApplication.class) + private static int BATCH_SIZE = 1000 + public static final String VARIANTS_COLLECTION = "variants_2_0" + public static final String ANNOTATIONS_COLLECTION = "annotations_2_0" + @Autowired + MongoTemplate mongoTemplate + MappingMongoConverter converter + String notRemediatedVariantsFilePath + BufferedWriter notRemediatedWriter + + @Override + void run(String... args) throws Exception { + String annotationRemediationInputFile = args[0] + this.notRemediatedVariantsFilePath = args[1] + + // workaround to not save _class field in documents + converter = mongoTemplate.getConverter() + converter.setTypeMapper(new DefaultMongoTypeMapper(null)) + + File inputFile = new File(annotationRemediationInputFile) + if (!inputFile.exists()) { + logger.error("Annotation remediation input file does not exist: {}", annotationRemediationInputFile) + System.exit(1) + } + + notRemediatedWriter = new BufferedWriter(new FileWriter(notRemediatedVariantsFilePath, true)) + // Stream through the file line by line, accumulating into a batch. + // Once the batch reaches BATCH_SIZE, process and clear it before reading further. + try { + List batch = new ArrayList<>() + int lineNumber = 0 + int skippedLines = 0 + int totalProcessed = 0 + + inputFile.withReader { reader -> + String line + while ((line = reader.readLine()) != null) { + lineNumber++ + String trimmed = line.trim() + if (!trimmed) { + continue + } + + String[] parts = trimmed.split(",", -1) + if (parts.length < 3 || !parts[0].trim() || !parts[1].trim() || !parts[2].trim()) { + logger.warn("Skipping malformed line {} in input file: '{}'", lineNumber, trimmed) + skippedLines++ + continue + } + + batch.add([parts[0].trim(), parts[1].trim(), parts[2].trim()] as String[]) + + if (batch.size() >= RemediateAnnotationsApplication.BATCH_SIZE) { + processBatch(batch) + totalProcessed += batch.size() + logger.info("Total entries processed so far: {}", totalProcessed) + batch.clear() + } + } + } + + if (!batch.isEmpty()) { + processBatch(batch) + totalProcessed += batch.size() + batch.clear() + } + + logger.info("Annotation remediation complete. Total entries processed: {}, malformed lines skipped: {}", + totalProcessed, skippedLines) + } finally { + notRemediatedWriter.close() + } + + System.exit(0) + } + + void processBatch(List batch) { + // keyed by old variant id + Map orgIdNewIdMap = new LinkedHashMap<>() // oldId -> newId + Map orgIdInsdcChrMap = new LinkedHashMap<>() // oldId -> insdcContig + + for (String[] entry : batch) { + orgIdNewIdMap.put(entry[0], entry[1]) + orgIdInsdcChrMap.put(entry[0], entry[2]) + } + + // Fetch all new variants from DB in one query for the whole batch + Map variantsInDBMap = getVariantsByIds(new ArrayList<>(orgIdNewIdMap.values())) + + // Verify each entry: new variant must exist in DB with the expected INSDC contig. + // Failures are logged and written to the not-remediated file; only verified entries proceed. + Map verifiedOrgIdNewIdMap = new LinkedHashMap<>() + + for (Map.Entry entry : orgIdNewIdMap.entrySet()) { + String oldVariantId = entry.getKey() + String newVariantId = entry.getValue() + + VariantDocument variantInDB = variantsInDBMap.get(newVariantId) + + if (variantInDB == null) { + logger.error("Variant check failed: new variant id {} not found in DB (old id: {}). " + + "Skipping annotation remediation for this variant.", newVariantId, oldVariantId) + storeNotRemediatedVariant(oldVariantId, newVariantId, "New variant id not found in DB") + continue + } + + verifiedOrgIdNewIdMap.put(oldVariantId, newVariantId) + } + + logger.info("Batch: {} entries passed variant check, {} skipped", + verifiedOrgIdNewIdMap.size(), orgIdNewIdMap.size() - verifiedOrgIdNewIdMap.size()) + + if (!verifiedOrgIdNewIdMap.isEmpty()) { + logger.info("Remediate Annotations for Ids: " + verifiedOrgIdNewIdMap.keySet()) + remediateAnnotations(verifiedOrgIdNewIdMap, orgIdInsdcChrMap) + } + } + + Map getVariantsByIds(List ids) { + Query idQuery = new Query(where("_id").in(ids)) + List variants = mongoTemplate.find(idQuery, VariantDocument.class, VARIANTS_COLLECTION) + return variants.stream().collect(Collectors.toMap(v -> v.getId(), v -> v)) + } + + void storeNotRemediatedVariant(String oldVariantId, String newVariantId, String reason) { + try { + notRemediatedWriter.write("${oldVariantId},${newVariantId},${reason}\n") + notRemediatedWriter.flush() + } catch (IOException e) { + logger.error("Error writing to not-remediated variants file for old id {}: {}", oldVariantId, e.getMessage()) + } + } + + void remediateAnnotations(Map orgIdNewIdMap, Map orgIdInsdcChrMap) { + Set idProcessed = new HashSet<>() + + // fetch annotations for all old variant ids + List oldIdCriteria = new ArrayList<>() + for (String oldId : orgIdNewIdMap.keySet()) { + oldIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(oldId) + "_\\d")) + } + + // fetch annotations for all new variant ids + List newIdCriteria = new ArrayList<>() + for (String newId : orgIdNewIdMap.values()) { + newIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(newId) + "_\\d")) + } + + try { + List oldAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION) + .find(new Query(new Criteria().orOperator(oldIdCriteria.toArray(new Criteria[0]))).getQueryObject()) + .into(new ArrayList<>()) + Map> oldVariantIdToDocuments = new HashMap<>() + for (Document doc : oldAnnotationsList) { + String variantId = extractVariantIdFromAnnotationId(doc.getString("_id")) + if (variantId != null) { + oldVariantIdToDocuments.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc) + }else{ + logger.error("Could not get variantId from the annotation: " + doc) + } + } + + List newAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION) + .find(new Query(new Criteria().orOperator(newIdCriteria.toArray(new Criteria[0]))).getQueryObject()) + .into(new ArrayList<>()) + Map> newVariantIdToAnnotationIds = new HashMap<>() + for (Document doc : newAnnotationsList) { + String variantId = extractVariantIdFromAnnotationId(doc.getString("_id")) + if (variantId != null) { + newVariantIdToAnnotationIds.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc.getString("_id")) + }else{ + logger.error("Could not get variantId from the annotation: " + doc) + } + } + + // Process each old->new id pair + for (Map.Entry entry : orgIdNewIdMap.entrySet()) { + String orgVariantId = entry.getKey() + String newVariantId = entry.getValue() + + Set orgAnnotationsSet = oldVariantIdToDocuments.getOrDefault(orgVariantId, Collections.emptySet()) + Set existingNewAnnotationIds = newVariantIdToAnnotationIds.getOrDefault(newVariantId, Collections.emptySet()) + + if (orgAnnotationsSet.isEmpty()) { + logger.info("No annotations found for old variant id {}, nothing to remediate", orgVariantId) + continue + } + + List toInsert = new ArrayList<>() + List toDelete = new ArrayList<>() + + for (Document annotation : orgAnnotationsSet) { + try { + String orgAnnotationId = annotation.getString("_id") + String updatedAnnotationId = orgAnnotationId.replace(orgVariantId, newVariantId) + + if (!existingNewAnnotationIds.contains(updatedAnnotationId)) { + if (!idProcessed.contains(updatedAnnotationId)) { + Document updated = new Document(annotation) + updated.put("_id", updatedAnnotationId) + updated.put("chr", orgIdInsdcChrMap.get(orgVariantId)) + toInsert.add(updated) + idProcessed.add(updatedAnnotationId) + } + } + + toDelete.add(orgAnnotationId) + } catch (Exception e) { + logger.error("Error processing annotation for original variant id {}: {}", + orgVariantId, e.getMessage(), e) + } + } + + if (!toDelete.isEmpty()) { + mongoTemplate.remove(Query.query(Criteria.where("_id").in(toDelete)), ANNOTATIONS_COLLECTION) + } + if (!toInsert.isEmpty()) { + mongoTemplate.getCollection(ANNOTATIONS_COLLECTION).insertMany(toInsert) + } + + logger.info("Annotation remediation for old variant id {}: {} inserted, {} deleted", + orgVariantId, toInsert.size(), toDelete.size()) + } + } catch (BsonSerializationException ex) { + logger.error("Exception occurred while trying to update annotations: {}", ex.getMessage(), ex) + } + } + + static String extractVariantIdFromAnnotationId(String annotationId) { + return annotationId.replaceFirst(/_\w+_\w+$/, "") + } +} \ No newline at end of file diff --git a/tasks/eva_2326/src/test/resources/application-test.properties b/tasks/eva_2326/src/test/resources/application-test.properties index f5eba04..c15915c 100644 --- a/tasks/eva_2326/src/test/resources/application-test.properties +++ b/tasks/eva_2326/src/test/resources/application-test.properties @@ -1,7 +1,7 @@ spring.data.mongodb.host=localhost spring.data.mongodb.port=27017 -spring.data.mongodb.username=eva_pipeline_mongo_user -spring.data.mongodb.password=eva_pipeline_mongo_pass +spring.data.mongodb.username=mongo_user +spring.data.mongodb.password=mongo_pass spring.data.mongodb.authentication-database=admin mongodb.read-preference=secondaryPreferred spring.data.mongodb.authentication-mechanism=SCRAM-SHA-1