Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle missing values in blocked MPI records #242

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion src/recordlinker/database/mpi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,73 @@
from recordlinker import schemas


def _filter_incorrect_blocks(
record: schemas.PIIRecord,
patients: typing.Sequence[models.Patient],
blocking_keys: list[str]
) -> list[models.Patient]:
"""
Filter a set of candidates returned via blocking from the MPI. The initial
SQL query returns a collection of candidates comprised of all patients in
the MPI belonging to a Person cluster for which at *least one* patient
satisfied blocking criteria. This function filters that candidate set to
include *only* those patients who either satisfied blocking criteria or
were missing a value for one or more blocked fields. This eliminates
patients from consideration who have mismatched blocking information but
belonged to a Person cluster where a different record had correct blocking
values.

:param record: The PIIRecord of the incoming patient.
:param patients: The initial collection of candidates retrieved from the MPI.
:param blocking_keys: A list of strings of the fields used for blocking.
:returns: A filtered list of Patients from the MPI.
"""
# Extract the acceptable blocking values from the incoming record
# Keys have already been getattr validated by caller, no need
# to check that they exist
blocking_vals_in_incoming = {}
for bk in blocking_keys:
key = getattr(models.BlockingKey, bk)
vals_blocked_from_key = [v for v in record.blocking_keys(key)]
if len(vals_blocked_from_key) > 0:
blocking_vals_in_incoming[bk] = vals_blocked_from_key

# Can't modify sequence in place, so we'll build up a list of list idxs
# to exclude for mpi patients who don't match blocking criteria exactly
pats_to_exclude = set()
for p in patients:
# Note: This implementation searches for compatible values in the
# fields of candidates. It is possible to write this inner loop
# checking for incompatible values instead. This changes which loop
# gets short-circuited. Performance testing found compatible search
# faster than incompatible search due to generator termination and
# time-complexity growth with number of blocking keys. The more
# normalization and preprocessing done in `feature_iter`, the slower
# this search method becomes. If heavy processing is performed,
# consider switching to incompatible search.
num_agreeing_blocking_fields = 0
mpi_record = p.record
for bk, allowed_vals in blocking_vals_in_incoming.items():
# Compare incoming blocking value to what would be the blocking
# value of the mpi record to make sure we compare on e.g. same
# number of characters at beginning/end of string
mpi_vals = mpi_record.blocking_keys(getattr(models.BlockingKey, bk))

# Generator gets us best performance, fastest way to check membership
# because we return True as soon as we get 1 rather than build the
# whole list. Also count compatibility if mpi_val is missing.
found_compatible_val = (len(mpi_vals) == 0) or any(x in mpi_vals for x in allowed_vals)
if found_compatible_val:
num_agreeing_blocking_fields += 1

# If we get through all the blocking criteria with no missing entries
# and no true-value agreement, we exclude
if num_agreeing_blocking_fields < len(blocking_keys):
pats_to_exclude.add(p.id)

return [pat for pat in patients if pat.id not in pats_to_exclude]


def get_block_data(
session: orm.Session, record: schemas.PIIRecord, algorithm_pass: models.AlgorithmPass
) -> typing.Sequence[models.Patient]:
Expand Down Expand Up @@ -60,7 +127,8 @@ def get_block_data(

# Using the subquery of unique Patient IDs, select all the Patients
expr = expression.select(models.Patient).where(models.Patient.person_id.in_(base))
return session.execute(expr).scalars().all()
candidates = session.execute(expr).scalars().all()
return _filter_incorrect_blocks(record, candidates, algorithm_pass.blocking_keys)


def insert_patient(
Expand Down
14 changes: 7 additions & 7 deletions tests/unit/assets/possible_match_default_patient_bundle.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@
"birthDate": "1980-01-01",
"gender": "male",
"address": [
{
{
"line": [
"Bay 16",
"Ward Sector 24"
],
"city": "Brooklyn",
"state": "New York",
"postalCode": "54321",
"city": "Boston",
"state": "Massachusetts",
"postalCode": "99999",
"use": "home"
}
}
],
"telecom": [
{
Expand Down Expand Up @@ -134,7 +134,7 @@
"birthDate": "1980-01-01",
"gender": "male",
"address": [
{
{
"line": [
"1234 Silversun Strip"
],
Expand All @@ -161,4 +161,4 @@
}
}
]
}
}
72 changes: 66 additions & 6 deletions tests/unit/database/test_mpi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,9 @@ class TestGetBlockData:
@pytest.fixture
def prime_index(self, session: Session):
person_1 = models.Person()
person_2 = models.Person()
session.add(person_1)
session.add(person_2)
session.flush()

data = [
Expand Down Expand Up @@ -498,8 +500,36 @@ def prime_index(self, session: Session):
],
"birthdate": "",
},
models.Person(),
person_2,
),
(
{
"name": [
{
"given": [
"Ferris",
],
"family": "Bueller",
}
],
"birthdate": "1974-11-07",
},
person_2
),
(
{
"name": [
{
"given": [
"Ferris",
],
"family": "Bueller",
}
],
"birthdate": "1983-08-17",
},
person_2
)
]
for datum, person in data:
mpi_service.insert_patient(session, schemas.PIIRecord(**datum), person=person)
Expand Down Expand Up @@ -553,6 +583,30 @@ def test_block_empty_block_key(self, session: Session, prime_index: None):
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_filter_mpi_candidates(self, session: Session, prime_index: None):
"""
Tests filtering candidates returned from the MPI for either blocking
agreement or missing information. Patients who are in pulled clusters
but have wrong blocking fields should be eliminated from consideration.
"""
data = {
"name": [
{
"given": [
"Ferris",
],
"family": "Bueller",
}
],
"birthdate": "1974-11-07",
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE", "FIRST_NAME"])
# Will initially be 3 patients in this person cluster
# One agrees on blocking, one has missing values, and one
# is wrong, so we should throw away that one
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 2

def test_block_on_birthdate(self, session: Session, prime_index: None):
data = {
"name": [
Expand Down Expand Up @@ -600,7 +654,8 @@ def test_block_on_first_name(self, session: Session, prime_index: None):
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["FIRST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 5
# One candidate in MPI person_1 is a Bill, will be ruled out
assert len(matches) == 4

def test_block_on_birthdate_and_first_name(self, session: Session, prime_index: None):
data = {
Expand All @@ -617,7 +672,8 @@ def test_block_on_birthdate_and_first_name(self, session: Session, prime_index:
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE", "FIRST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 4
# One candidate in MPI person_1 is just a Bill, ruled out
assert len(matches) == 3

def test_block_on_birthdate_first_name_and_last_name(self, session: Session, prime_index: None):
data = {
Expand All @@ -636,7 +692,8 @@ def test_block_on_birthdate_first_name_and_last_name(self, session: Session, pri
blocking_keys=["BIRTHDATE", "FIRST_NAME", "LAST_NAME"]
)
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 3
# One person in MPI person_1 is just a Bill, ruled out
assert len(matches) == 2
data = {
"name": [
{
Expand All @@ -649,7 +706,9 @@ def test_block_on_birthdate_first_name_and_last_name(self, session: Session, pri
"birthdate": "Jan 1 1980",
}
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 3
# Blocking uses feature_iter, which yields only the first `given` for a
# single name object, so only the patient with 'Bill' is caught
assert len(matches) == 1
data = {
"name": [
{
Expand Down Expand Up @@ -681,7 +740,8 @@ def test_block_on_multiple_names(self, session: Session, prime_index: None):
kwargs={},
)
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 5
# One of patients in MPI person_1 is a Bill, so is excluded
assert len(matches) == 4

def test_block_missing_keys(self, session: Session, prime_index: None):
data = {"birthdate": "01/01/1980"}
Expand Down
Loading