Skip to content

Commit

Permalink
feat(website, backend, prepro)!: Update error schema with input and o…
Browse files Browse the repository at this point in the history
…utput metadata field names (#3348)

* Update specification.md

* Update nextclade prepro pipeline to include input and output metadata fields

* add DB migration

* update dummy pipeline to include input and output metadata fields

---------

Co-authored-by: Theo Sanderson <[email protected]>
  • Loading branch information
anna-parker and theosanderson authored Dec 11, 2024
1 parent 9323a18 commit 3c6510b
Show file tree
Hide file tree
Showing 19 changed files with 590 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class InsertionDeserializer : JsonDeserializer<Insertion>() {
}

data class PreprocessingAnnotation(
val source: List<PreprocessingAnnotationSource>,
val unprocessedFields: List<PreprocessingAnnotationSource>,
val processedFields: List<PreprocessingAnnotationSource>,
@Schema(description = "A descriptive message that helps the submitter to fix the issue") val message: String,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
update sequence_entries_preprocessed_data
set warnings = (
select jsonb_agg(
jsonb_build_object(
'unprocessedFields', (
select jsonb_agg(
jsonb_build_object(
'name', source->>'name',
'type', source->>'type'
)
)
from jsonb_array_elements(warning->'source') as source
),
'processedFields', (
select jsonb_agg(
jsonb_build_object(
'name', source->>'name',
'type', source->>'type'
)
)
from jsonb_array_elements(warning->'source') as source
),
'message', warning->>'message'
)
)
from jsonb_array_elements(warnings) as warning
)
where warnings is not null
and exists (
select 1
from jsonb_array_elements(warnings) as warning
where warning->'source' is not null
);

update sequence_entries_preprocessed_data
set errors = (
select jsonb_agg(
jsonb_build_object(
'unprocessedFields', (
select jsonb_agg(
jsonb_build_object(
'name', source->>'name',
'type', source->>'type'
)
)
from jsonb_array_elements(error->'source') as source
),
'processedFields', (
select jsonb_agg(
jsonb_build_object(
'name', source->>'name',
'type', source->>'type'
)
)
from jsonb_array_elements(error->'source') as source
),
'message', error->>'message'
)
)
from jsonb_array_elements(errors) as error
)
where errors is not null
and exists (
select 1
from jsonb_array_elements(errors) as error
where error->'source' is not null
);
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,13 @@ object PreparedProcessedData {
accession = accession,
errors = listOf(
PreprocessingAnnotation(
source = listOf(
unprocessedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.Metadata,
"host",
),
),
processedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.Metadata,
"host",
Expand All @@ -405,7 +411,13 @@ object PreparedProcessedData {
"Not this kind of host",
),
PreprocessingAnnotation(
source = listOf(
unprocessedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.NucleotideSequence,
MAIN_SEGMENT,
),
),
processedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.NucleotideSequence,
MAIN_SEGMENT,
Expand All @@ -420,7 +432,13 @@ object PreparedProcessedData {
accession = accession,
warnings = listOf(
PreprocessingAnnotation(
source = listOf(
unprocessedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.Metadata,
"host",
),
),
processedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.Metadata,
"host",
Expand All @@ -429,7 +447,13 @@ object PreparedProcessedData {
"Not this kind of host",
),
PreprocessingAnnotation(
source = listOf(
unprocessedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.NucleotideSequence,
MAIN_SEGMENT,
),
),
processedFields = listOf(
PreprocessingAnnotationSource(
PreprocessingAnnotationSourceType.NucleotideSequence,
MAIN_SEGMENT,
Expand Down
50 changes: 32 additions & 18 deletions preprocessing/dummy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import random
import time
from dataclasses import dataclass, field
from typing import List, Optional

import requests

Expand Down Expand Up @@ -72,7 +71,8 @@ class AnnotationSource:

@dataclass
class ProcessingAnnotation:
source: List[AnnotationSource]
unprocessedFields: list[AnnotationSource] # noqa: N815
processedFields: list[AnnotationSource] # noqa: N815
message: str


Expand All @@ -81,13 +81,11 @@ class Sequence:
accession: int
version: int
data: dict
errors: Optional[List[ProcessingAnnotation]] = field(default_factory=list[ProcessingAnnotation])
warnings: Optional[List[ProcessingAnnotation]] = field(
default_factory=list[ProcessingAnnotation]
)
errors: list[ProcessingAnnotation] = field(default_factory=list)
warnings: list[ProcessingAnnotation] = field(default_factory=list)


def fetch_unprocessed_sequences(etag: str | None, n: int) -> tuple[str | None, List[Sequence]]:
def fetch_unprocessed_sequences(etag: str | None, n: int) -> tuple[str | None, list[Sequence]]:
url = backendHost + "/extract-unprocessed-data"
params = {"numberOfSequenceEntries": n, "pipelineVersion": pipeline_version}
headers = {
Expand All @@ -111,7 +109,7 @@ def fetch_unprocessed_sequences(etag: str | None, n: int) -> tuple[str | None, L
)


def parse_ndjson(ndjson_data: str) -> List[Sequence]:
def parse_ndjson(ndjson_data: str) -> list[Sequence]:
json_strings = ndjson_data.split("\n")
entries = []
for json_str in json_strings:
Expand All @@ -123,7 +121,7 @@ def parse_ndjson(ndjson_data: str) -> List[Sequence]:
return entries


def process(unprocessed: List[Sequence]) -> List[Sequence]:
def process(unprocessed: list[Sequence]) -> list[Sequence]:
with open("mock-sequences.json", "r") as f:
mock_sequences = json.load(f)
possible_lineages = ["A.1", "A.1.1", "A.2"]
Expand All @@ -143,35 +141,49 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
if addErrors and not disable_randomly:
updated_sequence.errors = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
"This is a metadata error",
unprocessedFields=[AnnotationSource(list(metadata.keys())[0], "Metadata")],
processedFields=[AnnotationSource(list(metadata.keys())[0], "Metadata")],
message="This is a metadata error",
),
ProcessingAnnotation(
[
unprocessedFields=[
AnnotationSource(
list(mock_sequences["alignedNucleotideSequences"].keys())[0],
"NucleotideSequence",
)
],
"This is a sequence error",
processedFields=[
AnnotationSource(
list(mock_sequences["alignedNucleotideSequences"].keys())[0],
"NucleotideSequence",
)
],
message="This is a sequence error",
),
]

disable_randomly = randomWarnError and random.choice([True, False])
if addWarnings and not disable_randomly:
updated_sequence.warnings = [
ProcessingAnnotation(
[AnnotationSource(list(metadata.keys())[0], "Metadata")],
"This is a metadata warning",
unprocessedFields=[AnnotationSource(list(metadata.keys())[0], "Metadata")],
processedFields=[AnnotationSource(list(metadata.keys())[0], "Metadata")],
message="This is a metadata warning",
),
ProcessingAnnotation(
[
unprocessedFields=[
AnnotationSource(
list(mock_sequences["alignedNucleotideSequences"].keys())[0],
"NucleotideSequence",
)
],
processedFields=[
AnnotationSource(
list(mock_sequences["alignedNucleotideSequences"].keys())[0],
"NucleotideSequence",
)
],
"This is a sequence warning",
message="This is a sequence warning",
),
]

Expand All @@ -180,9 +192,11 @@ def process(unprocessed: List[Sequence]) -> List[Sequence]:
return processed


def submit_processed_sequences(processed: List[Sequence]):
def submit_processed_sequences(processed: list[Sequence]):
logging.info(sequence for sequence in processed)
json_strings = [json.dumps(dataclasses.asdict(sequence)) for sequence in processed]
ndjson_string = "\n".join(json_strings)
logging.info(ndjson_string)
url = backendHost + "/submit-processed-data?pipelineVersion=" + str(pipeline_version)
headers = {"Content-Type": "application/x-ndjson", "Authorization": "Bearer " + get_jwt()}
response = requests.post(url, data=ndjson_string, headers=headers)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@ def __hash__(self):

@dataclass(frozen=True)
class ProcessingAnnotation:
source: tuple[AnnotationSource, ...]
unprocessedFields: tuple[AnnotationSource, ...] # noqa: N815
processedFields: tuple[AnnotationSource, ...] # noqa: N815
message: str

def __post_init__(self):
object.__setattr__(self, "source", tuple(self.source))
object.__setattr__(self, "unprocessedFields", tuple(self.unprocessedFields))
object.__setattr__(self, "processedFields", tuple(self.processedFields))

def __hash__(self):
return hash((self.source, self.message))
return hash((self.unprocessedFields, self.processedFields, self.message))


@dataclass
Expand Down
Loading

0 comments on commit 3c6510b

Please sign in to comment.