Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions tasks/eva_2326/src/main/groovy/eva2326/nextflow.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
nextflow.enable.dsl=2

params.input_file = null
params.chunk_size = 1_000_000
params.working_dir = null
params.env_properties_file = null
params.db_name = null
params.max_parallel_chunks = 5
params.project_dir = null
params.groovy_script = null

// ── PROCESS 1: split into named chunks ───────────────────────────────────────
process SPLIT_INPUT {
output:
path "chunk_*.txt", emit: chunks

script:
"""
awk -v chunk_size=${params.chunk_size} '
{
line_num = NR
if ((line_num - 1) % chunk_size == 0) {
start = line_num
end = start + chunk_size - 1
# zero-pad to 10 digits so filenames sort correctly
filename = sprintf("chunk_%010d-%010d.txt", start, end)
}
print > filename
}
' ${params.input_file}
"""
}

// ── PROCESS 2: remediate one chunk ───────────────────────────────────────────
process REMEDIATE_CHUNK {
label 'long_time'
label 'med_mem'
maxForks params.max_parallel_chunks

tag { chunk_file.name }

input:
path chunk_file

output:
path "${chunk_file.baseName}.done", emit: done_flag

script:
"""
CHUNK_NAME=\$(basename ${chunk_file} .txt)
NOT_REMEDIATED_FILE=${params.working_dir}/variants_not_remediated/${params.db_name}_\$CHUNK_NAME.txt
mkdir -p \$(dirname "\$NOT_REMEDIATED_FILE")

bash run_groovy_script.sh \
${params.project_dir} \
${params.groovy_script} \
-envPropertiesFile=${params.env_properties_file} \
-dbName=${params.db_name} \
-annotationRemediationInputFile=\$(realpath ${chunk_file}) \
-notRemediatedVariantsFilePath="\$NOT_REMEDIATED_FILE" \
> ${params.working_dir}/\$CHUNK_NAME.log 2>&1

echo "Done: ${chunk_file.name}" > ${chunk_file.baseName}.done
"""
}

// ── WORKFLOW ──────────────────────────────────────────────────────────────────
workflow {
if (!params.input_file) error "Please provide --input_file"
if (!params.env_properties_file) error "Please provide --env_properties_file"
if (!params.db_name) error "Please provide --db_name"
if (!params.working_dir) error "Please provide --working_dir"
if (!params.project_dir) error "Please provide --project_dir"
if (!params.groovy_script) error "Please provide --groovy_script"

SPLIT_INPUT()

chunks_ch = SPLIT_INPUT.out.chunks.flatten()

REMEDIATE_CHUNK(chunks_ch)

REMEDIATE_CHUNK.out.done_flag
.collect()
.view { flags -> "All chunks complete: ${flags.size()} chunks processed" }
}
275 changes: 275 additions & 0 deletions tasks/eva_2326/src/main/groovy/eva2326/remediate_annotations.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
package eva2326

import groovy.cli.picocli.CliBuilder
import org.bson.Document
import org.bson.BsonSerializationException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.CommandLineRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
import org.springframework.boot.builder.SpringApplicationBuilder
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper
import org.springframework.data.mongodb.core.convert.MappingMongoConverter
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Query
import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument

import java.util.regex.Pattern
import java.util.stream.Collectors

import static org.springframework.data.mongodb.core.query.Criteria.where

def cli = new CliBuilder()
cli.envPropertiesFile(args: 1, "Properties file with db details to use for update", required: true)
cli.dbName(args: 1, "Database name that needs to be updated", required: true)
cli.annotationRemediationInputFile(args: 1, "Path to CSV file with old_variant_id,new_variant_id,insdc_contig for annotation remediation", required: true)
cli.notRemediatedVariantsFilePath(args: 1, "Path to file where the variants which are not remediated will be stored", required: true)
def options = cli.parse(args)
if (!options) {
cli.usage()
System.exit(1)
}

new SpringApplicationBuilder(RemediateAnnotationsApplication.class).properties([
'spring.config.location' : options.envPropertiesFile,
'spring.data.mongodb.database': options.dbName])
.run(options.annotationRemediationInputFile, options.notRemediatedVariantsFilePath)


@SpringBootApplication(exclude = [DataSourceAutoConfiguration.class])
class RemediateAnnotationsApplication implements CommandLineRunner {
static Logger logger = LoggerFactory.getLogger(RemediateAnnotationsApplication.class)
private static int BATCH_SIZE = 1000
public static final String VARIANTS_COLLECTION = "variants_2_0"
public static final String ANNOTATIONS_COLLECTION = "annotations_2_0"
@Autowired
MongoTemplate mongoTemplate
MappingMongoConverter converter
String notRemediatedVariantsFilePath
BufferedWriter notRemediatedWriter

@Override
void run(String... args) throws Exception {
String annotationRemediationInputFile = args[0]
this.notRemediatedVariantsFilePath = args[1]

// workaround to not save _class field in documents
converter = mongoTemplate.getConverter()
converter.setTypeMapper(new DefaultMongoTypeMapper(null))

File inputFile = new File(annotationRemediationInputFile)
if (!inputFile.exists()) {
logger.error("Annotation remediation input file does not exist: {}", annotationRemediationInputFile)
System.exit(1)
}

notRemediatedWriter = new BufferedWriter(new FileWriter(notRemediatedVariantsFilePath, true))
// Stream through the file line by line, accumulating into a batch.
// Once the batch reaches BATCH_SIZE, process and clear it before reading further.
try {
List<String[]> batch = new ArrayList<>()
int lineNumber = 0
int skippedLines = 0
int totalProcessed = 0

inputFile.withReader { reader ->
String line
while ((line = reader.readLine()) != null) {
lineNumber++
String trimmed = line.trim()
if (!trimmed) {
continue
}

String[] parts = trimmed.split(",", -1)
if (parts.length < 3 || !parts[0].trim() || !parts[1].trim() || !parts[2].trim()) {
logger.warn("Skipping malformed line {} in input file: '{}'", lineNumber, trimmed)
skippedLines++
continue
}

batch.add([parts[0].trim(), parts[1].trim(), parts[2].trim()] as String[])

if (batch.size() >= RemediateAnnotationsApplication.BATCH_SIZE) {
processBatch(batch)
totalProcessed += batch.size()
logger.info("Total entries processed so far: {}", totalProcessed)
batch.clear()
}
}
}

if (!batch.isEmpty()) {
processBatch(batch)
totalProcessed += batch.size()
batch.clear()
}

logger.info("Annotation remediation complete. Total entries processed: {}, malformed lines skipped: {}",
totalProcessed, skippedLines)
} finally {
notRemediatedWriter.close()
}

System.exit(0)
}

void processBatch(List<String[]> batch) {
// keyed by old variant id
Map<String, String> orgIdNewIdMap = new LinkedHashMap<>() // oldId -> newId
Map<String, String> orgIdInsdcChrMap = new LinkedHashMap<>() // oldId -> insdcContig

for (String[] entry : batch) {
orgIdNewIdMap.put(entry[0], entry[1])
orgIdInsdcChrMap.put(entry[0], entry[2])
}

// Fetch all new variants from DB in one query for the whole batch
Map<String, VariantDocument> variantsInDBMap = getVariantsByIds(new ArrayList<>(orgIdNewIdMap.values()))

// Verify each entry: new variant must exist in DB with the expected INSDC contig.
// Failures are logged and written to the not-remediated file; only verified entries proceed.
Map<String, String> verifiedOrgIdNewIdMap = new LinkedHashMap<>()

for (Map.Entry<String, String> entry : orgIdNewIdMap.entrySet()) {
String oldVariantId = entry.getKey()
String newVariantId = entry.getValue()

VariantDocument variantInDB = variantsInDBMap.get(newVariantId)

if (variantInDB == null) {
logger.error("Variant check failed: new variant id {} not found in DB (old id: {}). " +
"Skipping annotation remediation for this variant.", newVariantId, oldVariantId)
storeNotRemediatedVariant(oldVariantId, newVariantId, "New variant id not found in DB")
continue
}

verifiedOrgIdNewIdMap.put(oldVariantId, newVariantId)
}

logger.info("Batch: {} entries passed variant check, {} skipped",
verifiedOrgIdNewIdMap.size(), orgIdNewIdMap.size() - verifiedOrgIdNewIdMap.size())

if (!verifiedOrgIdNewIdMap.isEmpty()) {
logger.info("Remediate Annotations for Ids: " + verifiedOrgIdNewIdMap.keySet())
remediateAnnotations(verifiedOrgIdNewIdMap, orgIdInsdcChrMap)
}
}

Map<String, VariantDocument> getVariantsByIds(List<String> ids) {
Query idQuery = new Query(where("_id").in(ids))
List<VariantDocument> variants = mongoTemplate.find(idQuery, VariantDocument.class, VARIANTS_COLLECTION)
return variants.stream().collect(Collectors.toMap(v -> v.getId(), v -> v))
}

void storeNotRemediatedVariant(String oldVariantId, String newVariantId, String reason) {
try {
notRemediatedWriter.write("${oldVariantId},${newVariantId},${reason}\n")
notRemediatedWriter.flush()
} catch (IOException e) {
logger.error("Error writing to not-remediated variants file for old id {}: {}", oldVariantId, e.getMessage())
}
}

void remediateAnnotations(Map<String, String> orgIdNewIdMap, Map<String, String> orgIdInsdcChrMap) {
Set<String> idProcessed = new HashSet<>()

// fetch annotations for all old variant ids
List<Criteria> oldIdCriteria = new ArrayList<>()
for (String oldId : orgIdNewIdMap.keySet()) {
oldIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(oldId) + "_\\d"))
}

// fetch annotations for all new variant ids
List<Criteria> newIdCriteria = new ArrayList<>()
for (String newId : orgIdNewIdMap.values()) {
newIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(newId) + "_\\d"))
}

try {
List<Document> oldAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION)
.find(new Query(new Criteria().orOperator(oldIdCriteria.toArray(new Criteria[0]))).getQueryObject())
.into(new ArrayList<>())
Map<String, Set<Document>> oldVariantIdToDocuments = new HashMap<>()
for (Document doc : oldAnnotationsList) {
String variantId = extractVariantIdFromAnnotationId(doc.getString("_id"))
if (variantId != null) {
oldVariantIdToDocuments.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc)
}else{
logger.error("Could not get variantId from the annotation: " + doc)
}
}

List<Document> newAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION)
.find(new Query(new Criteria().orOperator(newIdCriteria.toArray(new Criteria[0]))).getQueryObject())
.into(new ArrayList<>())
Map<String, Set<String>> newVariantIdToAnnotationIds = new HashMap<>()
for (Document doc : newAnnotationsList) {
String variantId = extractVariantIdFromAnnotationId(doc.getString("_id"))
if (variantId != null) {
newVariantIdToAnnotationIds.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc.getString("_id"))
}else{
logger.error("Could not get variantId from the annotation: " + doc)
}
}

// Process each old->new id pair
for (Map.Entry<String, String> entry : orgIdNewIdMap.entrySet()) {
String orgVariantId = entry.getKey()
String newVariantId = entry.getValue()

Set<Document> orgAnnotationsSet = oldVariantIdToDocuments.getOrDefault(orgVariantId, Collections.emptySet())
Set<String> existingNewAnnotationIds = newVariantIdToAnnotationIds.getOrDefault(newVariantId, Collections.emptySet())

if (orgAnnotationsSet.isEmpty()) {
logger.info("No annotations found for old variant id {}, nothing to remediate", orgVariantId)
continue
}

List<Document> toInsert = new ArrayList<>()
List<String> toDelete = new ArrayList<>()

for (Document annotation : orgAnnotationsSet) {
try {
String orgAnnotationId = annotation.getString("_id")
String updatedAnnotationId = orgAnnotationId.replace(orgVariantId, newVariantId)

if (!existingNewAnnotationIds.contains(updatedAnnotationId)) {
if (!idProcessed.contains(updatedAnnotationId)) {
Document updated = new Document(annotation)
updated.put("_id", updatedAnnotationId)
updated.put("chr", orgIdInsdcChrMap.get(orgVariantId))
toInsert.add(updated)
idProcessed.add(updatedAnnotationId)
}
}

toDelete.add(orgAnnotationId)
} catch (Exception e) {
logger.error("Error processing annotation for original variant id {}: {}",
orgVariantId, e.getMessage(), e)
}
}

if (!toDelete.isEmpty()) {
mongoTemplate.remove(Query.query(Criteria.where("_id").in(toDelete)), ANNOTATIONS_COLLECTION)
}
if (!toInsert.isEmpty()) {
mongoTemplate.getCollection(ANNOTATIONS_COLLECTION).insertMany(toInsert)
}

logger.info("Annotation remediation for old variant id {}: {} inserted, {} deleted",
orgVariantId, toInsert.size(), toDelete.size())
}
} catch (BsonSerializationException ex) {
logger.error("Exception occurred while trying to update annotations: {}", ex.getMessage(), ex)
}
}

static String extractVariantIdFromAnnotationId(String annotationId) {
return annotationId.replaceFirst(/_\w+_\w+$/, "")
}
}
4 changes: 2 additions & 2 deletions tasks/eva_2326/src/test/resources/application-test.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.username=eva_pipeline_mongo_user
spring.data.mongodb.password=eva_pipeline_mongo_pass
spring.data.mongodb.username=mongo_user
spring.data.mongodb.password=mongo_pass
spring.data.mongodb.authentication-database=admin
mongodb.read-preference=secondaryPreferred
spring.data.mongodb.authentication-mechanism=SCRAM-SHA-1
Expand Down