Skip to content

Commit b1d542b

Browse files
authored
EVA-3933 Add script to read variant ids from a file and do the annotation remediatioin (#206)
1 parent a656d8e commit b1d542b

File tree

3 files changed

+362
-2
lines changed

3 files changed

+362
-2
lines changed
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
nextflow.enable.dsl=2
2+
3+
params.input_file = null
4+
params.chunk_size = 1_000_000
5+
params.working_dir = null
6+
params.env_properties_file = null
7+
params.db_name = null
8+
params.max_parallel_chunks = 5
9+
params.project_dir = null
10+
params.groovy_script = null
11+
12+
// ── PROCESS 1: split into named chunks ───────────────────────────────────────
13+
process SPLIT_INPUT {
14+
output:
15+
path "chunk_*.txt", emit: chunks
16+
17+
script:
18+
"""
19+
awk -v chunk_size=${params.chunk_size} '
20+
{
21+
line_num = NR
22+
if ((line_num - 1) % chunk_size == 0) {
23+
start = line_num
24+
end = start + chunk_size - 1
25+
# zero-pad to 10 digits so filenames sort correctly
26+
filename = sprintf("chunk_%010d-%010d.txt", start, end)
27+
}
28+
print > filename
29+
}
30+
' ${params.input_file}
31+
"""
32+
}
33+
34+
// ── PROCESS 2: remediate one chunk ───────────────────────────────────────────
35+
process REMEDIATE_CHUNK {
36+
label 'long_time'
37+
label 'med_mem'
38+
maxForks params.max_parallel_chunks
39+
40+
tag { chunk_file.name }
41+
42+
input:
43+
path chunk_file
44+
45+
output:
46+
path "${chunk_file.baseName}.done", emit: done_flag
47+
48+
script:
49+
"""
50+
CHUNK_NAME=\$(basename ${chunk_file} .txt)
51+
NOT_REMEDIATED_FILE=${params.working_dir}/variants_not_remediated/${params.db_name}_\$CHUNK_NAME.txt
52+
mkdir -p \$(dirname "\$NOT_REMEDIATED_FILE")
53+
54+
bash run_groovy_script.sh \
55+
${params.project_dir} \
56+
${params.groovy_script} \
57+
-envPropertiesFile=${params.env_properties_file} \
58+
-dbName=${params.db_name} \
59+
-annotationRemediationInputFile=\$(realpath ${chunk_file}) \
60+
-notRemediatedVariantsFilePath="\$NOT_REMEDIATED_FILE" \
61+
> ${params.working_dir}/\$CHUNK_NAME.log 2>&1
62+
63+
echo "Done: ${chunk_file.name}" > ${chunk_file.baseName}.done
64+
"""
65+
}
66+
67+
// ── WORKFLOW ──────────────────────────────────────────────────────────────────
68+
workflow {
69+
if (!params.input_file) error "Please provide --input_file"
70+
if (!params.env_properties_file) error "Please provide --env_properties_file"
71+
if (!params.db_name) error "Please provide --db_name"
72+
if (!params.working_dir) error "Please provide --working_dir"
73+
if (!params.project_dir) error "Please provide --project_dir"
74+
if (!params.groovy_script) error "Please provide --groovy_script"
75+
76+
SPLIT_INPUT()
77+
78+
chunks_ch = SPLIT_INPUT.out.chunks.flatten()
79+
80+
REMEDIATE_CHUNK(chunks_ch)
81+
82+
REMEDIATE_CHUNK.out.done_flag
83+
.collect()
84+
.view { flags -> "All chunks complete: ${flags.size()} chunks processed" }
85+
}
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
package eva2326
2+
3+
import groovy.cli.picocli.CliBuilder
4+
import org.bson.Document
5+
import org.bson.BsonSerializationException
6+
import org.slf4j.Logger
7+
import org.slf4j.LoggerFactory
8+
import org.springframework.beans.factory.annotation.Autowired
9+
import org.springframework.boot.CommandLineRunner
10+
import org.springframework.boot.autoconfigure.SpringBootApplication
11+
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
12+
import org.springframework.boot.builder.SpringApplicationBuilder
13+
import org.springframework.data.mongodb.core.MongoTemplate
14+
import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper
15+
import org.springframework.data.mongodb.core.convert.MappingMongoConverter
16+
import org.springframework.data.mongodb.core.query.Criteria
17+
import org.springframework.data.mongodb.core.query.Query
18+
import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument
19+
20+
import java.util.regex.Pattern
21+
import java.util.stream.Collectors
22+
23+
import static org.springframework.data.mongodb.core.query.Criteria.where
24+
25+
def cli = new CliBuilder()
26+
cli.envPropertiesFile(args: 1, "Properties file with db details to use for update", required: true)
27+
cli.dbName(args: 1, "Database name that needs to be updated", required: true)
28+
cli.annotationRemediationInputFile(args: 1, "Path to CSV file with old_variant_id,new_variant_id,insdc_contig for annotation remediation", required: true)
29+
cli.notRemediatedVariantsFilePath(args: 1, "Path to file where the variants which are not remediated will be stored", required: true)
30+
def options = cli.parse(args)
31+
if (!options) {
32+
cli.usage()
33+
System.exit(1)
34+
}
35+
36+
new SpringApplicationBuilder(RemediateAnnotationsApplication.class).properties([
37+
'spring.config.location' : options.envPropertiesFile,
38+
'spring.data.mongodb.database': options.dbName])
39+
.run(options.annotationRemediationInputFile, options.notRemediatedVariantsFilePath)
40+
41+
42+
@SpringBootApplication(exclude = [DataSourceAutoConfiguration.class])
43+
class RemediateAnnotationsApplication implements CommandLineRunner {
44+
static Logger logger = LoggerFactory.getLogger(RemediateAnnotationsApplication.class)
45+
private static int BATCH_SIZE = 1000
46+
public static final String VARIANTS_COLLECTION = "variants_2_0"
47+
public static final String ANNOTATIONS_COLLECTION = "annotations_2_0"
48+
@Autowired
49+
MongoTemplate mongoTemplate
50+
MappingMongoConverter converter
51+
String notRemediatedVariantsFilePath
52+
BufferedWriter notRemediatedWriter
53+
54+
@Override
55+
void run(String... args) throws Exception {
56+
String annotationRemediationInputFile = args[0]
57+
this.notRemediatedVariantsFilePath = args[1]
58+
59+
// workaround to not save _class field in documents
60+
converter = mongoTemplate.getConverter()
61+
converter.setTypeMapper(new DefaultMongoTypeMapper(null))
62+
63+
File inputFile = new File(annotationRemediationInputFile)
64+
if (!inputFile.exists()) {
65+
logger.error("Annotation remediation input file does not exist: {}", annotationRemediationInputFile)
66+
System.exit(1)
67+
}
68+
69+
notRemediatedWriter = new BufferedWriter(new FileWriter(notRemediatedVariantsFilePath, true))
70+
// Stream through the file line by line, accumulating into a batch.
71+
// Once the batch reaches BATCH_SIZE, process and clear it before reading further.
72+
try {
73+
List<String[]> batch = new ArrayList<>()
74+
int lineNumber = 0
75+
int skippedLines = 0
76+
int totalProcessed = 0
77+
78+
inputFile.withReader { reader ->
79+
String line
80+
while ((line = reader.readLine()) != null) {
81+
lineNumber++
82+
String trimmed = line.trim()
83+
if (!trimmed) {
84+
continue
85+
}
86+
87+
String[] parts = trimmed.split(",", -1)
88+
if (parts.length < 3 || !parts[0].trim() || !parts[1].trim() || !parts[2].trim()) {
89+
logger.warn("Skipping malformed line {} in input file: '{}'", lineNumber, trimmed)
90+
skippedLines++
91+
continue
92+
}
93+
94+
batch.add([parts[0].trim(), parts[1].trim(), parts[2].trim()] as String[])
95+
96+
if (batch.size() >= RemediateAnnotationsApplication.BATCH_SIZE) {
97+
processBatch(batch)
98+
totalProcessed += batch.size()
99+
logger.info("Total entries processed so far: {}", totalProcessed)
100+
batch.clear()
101+
}
102+
}
103+
}
104+
105+
if (!batch.isEmpty()) {
106+
processBatch(batch)
107+
totalProcessed += batch.size()
108+
batch.clear()
109+
}
110+
111+
logger.info("Annotation remediation complete. Total entries processed: {}, malformed lines skipped: {}",
112+
totalProcessed, skippedLines)
113+
} finally {
114+
notRemediatedWriter.close()
115+
}
116+
117+
System.exit(0)
118+
}
119+
120+
void processBatch(List<String[]> batch) {
121+
// keyed by old variant id
122+
Map<String, String> orgIdNewIdMap = new LinkedHashMap<>() // oldId -> newId
123+
Map<String, String> orgIdInsdcChrMap = new LinkedHashMap<>() // oldId -> insdcContig
124+
125+
for (String[] entry : batch) {
126+
orgIdNewIdMap.put(entry[0], entry[1])
127+
orgIdInsdcChrMap.put(entry[0], entry[2])
128+
}
129+
130+
// Fetch all new variants from DB in one query for the whole batch
131+
Map<String, VariantDocument> variantsInDBMap = getVariantsByIds(new ArrayList<>(orgIdNewIdMap.values()))
132+
133+
// Verify each entry: new variant must exist in DB with the expected INSDC contig.
134+
// Failures are logged and written to the not-remediated file; only verified entries proceed.
135+
Map<String, String> verifiedOrgIdNewIdMap = new LinkedHashMap<>()
136+
137+
for (Map.Entry<String, String> entry : orgIdNewIdMap.entrySet()) {
138+
String oldVariantId = entry.getKey()
139+
String newVariantId = entry.getValue()
140+
141+
VariantDocument variantInDB = variantsInDBMap.get(newVariantId)
142+
143+
if (variantInDB == null) {
144+
logger.error("Variant check failed: new variant id {} not found in DB (old id: {}). " +
145+
"Skipping annotation remediation for this variant.", newVariantId, oldVariantId)
146+
storeNotRemediatedVariant(oldVariantId, newVariantId, "New variant id not found in DB")
147+
continue
148+
}
149+
150+
verifiedOrgIdNewIdMap.put(oldVariantId, newVariantId)
151+
}
152+
153+
logger.info("Batch: {} entries passed variant check, {} skipped",
154+
verifiedOrgIdNewIdMap.size(), orgIdNewIdMap.size() - verifiedOrgIdNewIdMap.size())
155+
156+
if (!verifiedOrgIdNewIdMap.isEmpty()) {
157+
logger.info("Remediate Annotations for Ids: " + verifiedOrgIdNewIdMap.keySet())
158+
remediateAnnotations(verifiedOrgIdNewIdMap, orgIdInsdcChrMap)
159+
}
160+
}
161+
162+
Map<String, VariantDocument> getVariantsByIds(List<String> ids) {
163+
Query idQuery = new Query(where("_id").in(ids))
164+
List<VariantDocument> variants = mongoTemplate.find(idQuery, VariantDocument.class, VARIANTS_COLLECTION)
165+
return variants.stream().collect(Collectors.toMap(v -> v.getId(), v -> v))
166+
}
167+
168+
void storeNotRemediatedVariant(String oldVariantId, String newVariantId, String reason) {
169+
try {
170+
notRemediatedWriter.write("${oldVariantId},${newVariantId},${reason}\n")
171+
notRemediatedWriter.flush()
172+
} catch (IOException e) {
173+
logger.error("Error writing to not-remediated variants file for old id {}: {}", oldVariantId, e.getMessage())
174+
}
175+
}
176+
177+
void remediateAnnotations(Map<String, String> orgIdNewIdMap, Map<String, String> orgIdInsdcChrMap) {
178+
Set<String> idProcessed = new HashSet<>()
179+
180+
// fetch annotations for all old variant ids
181+
List<Criteria> oldIdCriteria = new ArrayList<>()
182+
for (String oldId : orgIdNewIdMap.keySet()) {
183+
oldIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(oldId) + "_\\d"))
184+
}
185+
186+
// fetch annotations for all new variant ids
187+
List<Criteria> newIdCriteria = new ArrayList<>()
188+
for (String newId : orgIdNewIdMap.values()) {
189+
newIdCriteria.add(Criteria.where("_id").regex("^" + Pattern.quote(newId) + "_\\d"))
190+
}
191+
192+
try {
193+
List<Document> oldAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION)
194+
.find(new Query(new Criteria().orOperator(oldIdCriteria.toArray(new Criteria[0]))).getQueryObject())
195+
.into(new ArrayList<>())
196+
Map<String, Set<Document>> oldVariantIdToDocuments = new HashMap<>()
197+
for (Document doc : oldAnnotationsList) {
198+
String variantId = extractVariantIdFromAnnotationId(doc.getString("_id"))
199+
if (variantId != null) {
200+
oldVariantIdToDocuments.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc)
201+
}else{
202+
logger.error("Could not get variantId from the annotation: " + doc)
203+
}
204+
}
205+
206+
List<Document> newAnnotationsList = mongoTemplate.getCollection(ANNOTATIONS_COLLECTION)
207+
.find(new Query(new Criteria().orOperator(newIdCriteria.toArray(new Criteria[0]))).getQueryObject())
208+
.into(new ArrayList<>())
209+
Map<String, Set<String>> newVariantIdToAnnotationIds = new HashMap<>()
210+
for (Document doc : newAnnotationsList) {
211+
String variantId = extractVariantIdFromAnnotationId(doc.getString("_id"))
212+
if (variantId != null) {
213+
newVariantIdToAnnotationIds.computeIfAbsent(variantId, k -> new HashSet<>()).add(doc.getString("_id"))
214+
}else{
215+
logger.error("Could not get variantId from the annotation: " + doc)
216+
}
217+
}
218+
219+
// Process each old->new id pair
220+
for (Map.Entry<String, String> entry : orgIdNewIdMap.entrySet()) {
221+
String orgVariantId = entry.getKey()
222+
String newVariantId = entry.getValue()
223+
224+
Set<Document> orgAnnotationsSet = oldVariantIdToDocuments.getOrDefault(orgVariantId, Collections.emptySet())
225+
Set<String> existingNewAnnotationIds = newVariantIdToAnnotationIds.getOrDefault(newVariantId, Collections.emptySet())
226+
227+
if (orgAnnotationsSet.isEmpty()) {
228+
logger.info("No annotations found for old variant id {}, nothing to remediate", orgVariantId)
229+
continue
230+
}
231+
232+
List<Document> toInsert = new ArrayList<>()
233+
List<String> toDelete = new ArrayList<>()
234+
235+
for (Document annotation : orgAnnotationsSet) {
236+
try {
237+
String orgAnnotationId = annotation.getString("_id")
238+
String updatedAnnotationId = orgAnnotationId.replace(orgVariantId, newVariantId)
239+
240+
if (!existingNewAnnotationIds.contains(updatedAnnotationId)) {
241+
if (!idProcessed.contains(updatedAnnotationId)) {
242+
Document updated = new Document(annotation)
243+
updated.put("_id", updatedAnnotationId)
244+
updated.put("chr", orgIdInsdcChrMap.get(orgVariantId))
245+
toInsert.add(updated)
246+
idProcessed.add(updatedAnnotationId)
247+
}
248+
}
249+
250+
toDelete.add(orgAnnotationId)
251+
} catch (Exception e) {
252+
logger.error("Error processing annotation for original variant id {}: {}",
253+
orgVariantId, e.getMessage(), e)
254+
}
255+
}
256+
257+
if (!toDelete.isEmpty()) {
258+
mongoTemplate.remove(Query.query(Criteria.where("_id").in(toDelete)), ANNOTATIONS_COLLECTION)
259+
}
260+
if (!toInsert.isEmpty()) {
261+
mongoTemplate.getCollection(ANNOTATIONS_COLLECTION).insertMany(toInsert)
262+
}
263+
264+
logger.info("Annotation remediation for old variant id {}: {} inserted, {} deleted",
265+
orgVariantId, toInsert.size(), toDelete.size())
266+
}
267+
} catch (BsonSerializationException ex) {
268+
logger.error("Exception occurred while trying to update annotations: {}", ex.getMessage(), ex)
269+
}
270+
}
271+
272+
static String extractVariantIdFromAnnotationId(String annotationId) {
273+
return annotationId.replaceFirst(/_\w+_\w+$/, "")
274+
}
275+
}

tasks/eva_2326/src/test/resources/application-test.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
spring.data.mongodb.host=localhost
22
spring.data.mongodb.port=27017
3-
spring.data.mongodb.username=eva_pipeline_mongo_user
4-
spring.data.mongodb.password=eva_pipeline_mongo_pass
3+
spring.data.mongodb.username=mongo_user
4+
spring.data.mongodb.password=mongo_pass
55
spring.data.mongodb.authentication-database=admin
66
mongodb.read-preference=secondaryPreferred
77
spring.data.mongodb.authentication-mechanism=SCRAM-SHA-1

0 commit comments

Comments
 (0)