Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DC-3865] Create CR to filter NLP table using domain table #1877

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions data_steward/cdr_cleaner/clean_cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from cdr_cleaner.cleaning_rules.drop_multiple_measurements import DropMultipleMeasurements
from cdr_cleaner.cleaning_rules.drop_participants_without_any_basics import DropParticipantsWithoutAnyBasics
from cdr_cleaner.cleaning_rules.clean_survey_conduct_recurring_surveys import CleanSurveyConductRecurringSurveys
from cdr_cleaner.cleaning_rules.filter_nlp_from_domains import FilterNLPfromDomains
from cdr_cleaner.cleaning_rules.update_survey_source_concept_id import UpdateSurveySourceConceptId
from cdr_cleaner.cleaning_rules.drop_unverified_survey_data import DropUnverifiedSurveyData
from cdr_cleaner.cleaning_rules.drug_refills_days_supply import DrugRefillsDaysSupply
Expand Down Expand Up @@ -268,6 +269,7 @@
), # Should run after GenerateExtTables and before CleanMappingExtTables
(PopulateSurveyConductExt,),
(CalculatePrimaryDeathRecord,),
(FilterNLPfromDomains,),
(NoDataAfterDeath,), # should run after CalculatePrimaryDeathRecord
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]
Expand Down Expand Up @@ -317,6 +319,7 @@
(DropOrphanedPIDS,),
(CalculatePrimaryDeathRecord,),
(GenerateWearStudyTable,),
(FilterNLPfromDomains,),
(DropViaSurveyConduct,), # should run after wear study table creation
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]
Expand All @@ -329,6 +332,7 @@
(CreatePersonExtTable,),
(CalculatePrimaryDeathRecord,),
(RTRetroactivePrivacyConceptSuppression,),
(FilterNLPfromDomains,),
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]

Expand All @@ -345,6 +349,7 @@
(CalculatePrimaryDeathRecord,),
(NoDataAfterDeath,), # should run after CalculatePrimaryDeathRecord
(RTRetroactivePrivacyConceptSuppression,),
(FilterNLPfromDomains,),
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]

Expand Down Expand Up @@ -392,6 +397,7 @@
(DropViaSurveyConduct,), # should run after wear study table creation
(RemoveExtraTables,), # Should be last cleaning rule to be run
(CalculatePrimaryDeathRecord,),
(FilterNLPfromDomains,),
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]

Expand All @@ -402,6 +408,7 @@
(CreatePersonExtTable,),
(CalculatePrimaryDeathRecord,),
(CTRetroactivePrivacyConceptSuppression,),
(FilterNLPfromDomains,),
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]

Expand All @@ -415,6 +422,7 @@
(CalculatePrimaryDeathRecord,),
(NoDataAfterDeath,), # should run after CalculatePrimaryDeathRecord
(CTRetroactivePrivacyConceptSuppression,),
(FilterNLPfromDomains,),
(CleanMappingExtTables,), # should be one of the last cleaning rules run
]

Expand Down
138 changes: 138 additions & 0 deletions data_steward/cdr_cleaner/cleaning_rules/filter_nlp_from_domains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# coding=utf-8
"""
Filter NLP records based on domain table rows that were moved from nlp and remained after cleaning.
Currently only applies to the condition_occurrence table.
Applied at every CDR pipeline stage from combined, wherever nlp record are moved to domains. This CR reverses it.

Jira issues = DC-3865
"""
# Python imports
import logging

# Project imports
from gcloud.bq import BigQueryClient
import constants.cdr_cleaner.clean_cdr as cdr_consts
from cdr_cleaner.cleaning_rules.base_cleaning_rule import BaseCleaningRule, query_spec_list
from common import JINJA_ENV, NOTE_NLP

LOGGER = logging.getLogger(__name__)

JIRA_ISSUE_NUMBERS = ['DC3865']

DELETE_NLP = JINJA_ENV.from_string("""
DELETE
FROM `{{project_id}}.{{dataset_id}}.note_nlp` l
WHERE note_nlp_id NOT IN
(SELECT note_nlp_id FROM `{{project_id}}.{{dataset_id}}.note_nlp` l
JOIN `{{project_id}}.{{dataset_id}}.condition_occurrence_ext` e ON l.note_nlp_id = e.note_nlp_id
JOIN `{{project_id}}.{{dataset_id}}.condition_occurrence` c ON e.condition_occurrence_id = c.condition_occurrence_id)
""")

DELETE_NLP_COND = JINJA_ENV.from_string("""
DELETE
FROM `{{project_id}}.{{dataset_id}}.condition_occurrence` c
WHERE condition_occurrence_id IN
(SELECT condition_occurrence_id FROM `{{project_id}}.{{dataset_id}}.condition_occurrence` c
JOIN `{{project_id}}.{{dataset_id}}.condition_occurrence_ext` e ON e.condition_occurrence_id = c.condition_occurrence_id
JOIN `{{project_id}}.{{dataset_id}}.note_nlp` l ON l.note_nlp_id = e.note_nlp_id)
""")


class FilterNLPfromDomains(BaseCleaningRule):

def __init__(self,
project_id,
dataset_id,
sandbox_dataset_id,
table_namer=None):
"""
Initialize the class with proper information.

Set the issue numbers, description and affected datasets. As other tickets may affect
this SQL, append them to the list of Jira Issues.
DO NOT REMOVE ORIGINAL JIRA ISSUE NUMBERS!
"""
desc = 'Nulls string fields in the note table.'

super().__init__(issue_numbers=JIRA_ISSUE_NUMBERS,
description=desc,
affected_datasets=[cdr_consts.COMBINED],
affected_tables=[NOTE_NLP],
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)

def get_sandbox_tablenames(self) -> list:
return [self.sandbox_table_for(table) for table in self.affected_tables]

def setup_rule(self, client: BigQueryClient, *args, **keyword_args) -> None:
"""
Load the lookup table values into the sandbox.

The following queries will use the lookup table as part of the execution.
Loads the operational pii fields from resource_files/_operational_pii_fields.csv
into project_id.sandbox_dataset_id.operational_pii_fields in BQ
"""
pass

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Return a list of dictionary query specifications.

:return: A list of dictionaries. Each dictionary contains a single query
and a specification for how to execute that query. The specifications
are optional but the query is required.
"""
queries_list = []

sandbox_query = dict()
sandbox_query[cdr_consts.QUERY] = DELETE_NLP.render(
project_id=self.project_id,
dataset_id=self.dataset_id,
sandbox_dataset_id=self.sandbox_dataset_id,
sandbox_table=self.get_sandbox_tablenames()[0])
queries_list.append(sandbox_query)

suppress_query = dict()
suppress_query[cdr_consts.QUERY] = DELETE_NLP_COND.render(
project_id=self.project_id,
dataset_id=self.dataset_id,
sandbox_dataset_id=self.sandbox_dataset_id,
sandbox_table=self.get_sandbox_tablenames()[0])
queries_list.append(suppress_query)

return queries_list

def setup_validation(self, client: BigQueryClient) -> None:
"""
Run required steps for validation setup
"""
pass

def validate_rule(self, client: BigQueryClient) -> None:
"""
Validates the cleaning rule which deletes or updates the data from the tables
"""
pass


if __name__ == '__main__':
import cdr_cleaner.args_parser as parser
import cdr_cleaner.clean_cdr_engine as clean_engine

ARGS = parser.parse_args()

if ARGS.list_queries:
clean_engine.add_console_logging()
query_list = clean_engine.get_query_list(ARGS.project_id,
ARGS.dataset_id,
ARGS.sandbox_dataset_id,
[(FilterNLPfromDomains,)])
for query in query_list:
LOGGER.info(query)
else:
clean_engine.add_console_logging(ARGS.console_log)
clean_engine.clean_dataset(ARGS.project_id, ARGS.dataset_id,
ARGS.sandbox_dataset_id,
[(FilterNLPfromDomains,)])