Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

missing blocking keys #254

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/recordlinker/assets/initial_algorithms.json
Original file line number Diff line number Diff line change
@@ -47,7 +47,9 @@
"SEX": 0.7510419059643679,
"STATE": 0.022376768992488694,
"ZIP": 4.975031471124867
}
},
"compare_minimum_percentage": 0.7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Brandon has worked on ticket #235, we have a different name for this field. Unless @bamader thinks we should have different minimums depending on whether we're in the blocking or comparison stage.


}
},
{
@@ -89,9 +91,10 @@
"SEX": 0.7510419059643679,
"STATE": 0.022376768992488694,
"ZIP": 4.975031471124867
}
},
"compare_minimum_percentage": 0.7
}
}
]
}
]
]
250 changes: 143 additions & 107 deletions src/recordlinker/database/mpi_service.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
This module provides the data access functions to the MPI tables
"""

import logging
import typing
import uuid

@@ -17,118 +18,153 @@
from recordlinker import models
from recordlinker import schemas


def _filter_incorrect_blocks(
record: schemas.PIIRecord,
patients: typing.Sequence[models.Patient],
blocking_keys: list[str]
) -> list[models.Patient]:
"""
Filter a set of candidates returned via blocking from the MPI. The initial
SQL query returns a collection of candidates comprised of all patients in
the MPI belonging to a Person cluster for which at *least one* patient
satisfied blocking criteria. This function filters that candidate set to
include *only* those patients who either satisfied blocking criteria or
were missing a value for one or more blocked fields. This eliminates
patients from consideration who have mismatched blocking information but
belonged to a Person cluster where a different record had correct blocking
values.

:param record: The PIIRecord of the incoming patient.
:param patients: The initial collection of candidates retrieved from the MPI.
:param blocking_keys: A list of strings of the fields used for blocking.
:returns: A filtered list of Patients from the MPI.
"""
# Extract the acceptable blocking values from the incoming record
# Keys have already been getattr validated by caller, no need
# to check that they exist
blocking_vals_in_incoming = {}
for bk in blocking_keys:
key = getattr(models.BlockingKey, bk)
vals_blocked_from_key = [v for v in record.blocking_keys(key)]
if len(vals_blocked_from_key) > 0:
blocking_vals_in_incoming[bk] = vals_blocked_from_key

# Can't modify sequence in place, so we'll build up a list of list idxs
# to exclude for mpi patients who don't match blocking criteria exactly
pats_to_exclude = set()
for p in patients:
# Note: This implementation searches for compatible values in the
# fields of candidates. It is possible to write this inner loop
# checking for incompatible values instead. This changes which loop
# gets short-circuited. Performance testing found compatible search
# faster than incompatible search due to generator termination and
# time-complexity growth with number of blocking keys. The more
# normalization and preprocessing done in `feature_iter`, the slower
# this search method becomes. If heavy processing is performed,
# consider switching to incompatible search.
num_agreeing_blocking_fields = 0
mpi_record = p.record
for bk, allowed_vals in blocking_vals_in_incoming.items():
# Compare incoming blocking value to what would be the blocking
# value of the mpi record to make sure we compare on e.g. same
# number of characters at beginning/end of string
mpi_vals = mpi_record.blocking_keys(getattr(models.BlockingKey, bk))

LOGGER = logging.getLogger(__name__)


class GetBlockData:
def _reset(self, kwargs: dict[str, typing.Any]) -> None:
"""
Reset the state of the class

:param kwargs: dict
:return: None
"""
self.kwargs: dict[str, typing.Any] = kwargs
# Create a list of tuples of (BlockingKey, log_odds, has_value)
# to use when building the query, this will let us know when we have
# too many missing values and need to abort the query
self.total_odds: float = 0
self.found_odds: float = 0
self.blocking_values: dict[models.BlockingKey, list[str]] = {}

def _check_skip_conditions(self) -> bool:
"""
Analyze the log odds of the blocking keys found in the query. If the number of
points collected is below the minimum threshold, return True to indicate this
blocking query should be skipped.

:return: bool
"""
minimum_percentage = self.kwargs.get("compare_minimum_percentage", 1.0)
details: dict[str, float] = {
"found_blocking_odds": self.found_odds,
"total_blocking_odds": self.total_odds,
"minimum_percentage": minimum_percentage,
}
if self.total_odds == 0 and any(not v for v in self.blocking_values.values()):
# No log odds were specified and we had at least 1 missing blocking key
LOGGER.info("skipping blocking query: no log odds", extra=details)
return True
if self.total_odds and (self.found_odds / self.total_odds) < minimum_percentage:
# The log odds for the found blocking keys were below the minimum threshold
LOGGER.info("skipping blocking query: log odds too low", extra=details)
return True
return False

def _filter_incorrect_match(self, patient: models.Patient) -> bool:
"""
Filter out patient records that have conflicting blocking values with the incoming
record. If either record is missing values for a blocking key, we can ignore,
however when both records have values, verify that there is overlap between
the values. Return False if the records are not in agreement.

:param patient: models.Patient
:return: bool
"""
agree_count = 0
for key, incoming_vals in self.blocking_values.items():
if not incoming_vals:
# The incoming record has no value for this blocking key, thus there
# is no reason to compare. We can increment the counter to indicate
# the two records are still in agreement and continue
agree_count += 1
continue
# Calculate the blocking values for the patient
patient_vals = patient.record.blocking_keys(key)
if not patient_vals:
# The patient record has no value for this blocking key, thus there
# is no reason to compare. We can increment the counter to indicate
# the two records are still in agreement and continue
agree_count += 1
continue
# Generator gets us best performance, fastest way to check membership
# because we return True as soon as we get 1 rather than build the
# whole list. Also count compatibility if mpi_val is missing.
found_compatible_val = (len(mpi_vals) == 0) or any(x in mpi_vals for x in allowed_vals)
if found_compatible_val:
num_agreeing_blocking_fields += 1
# whole list.
if any(v in patient_vals for v in incoming_vals):
agree_count += 1

# If we get through all the blocking criteria with no missing entries
# and no true-value agreement, we exclude
if num_agreeing_blocking_fields < len(blocking_keys):
pats_to_exclude.add(p.id)

return [pat for pat in patients if pat.id not in pats_to_exclude]


def get_block_data(
session: orm.Session, record: schemas.PIIRecord, algorithm_pass: models.AlgorithmPass
) -> typing.Sequence[models.Patient]:
"""
Get all of the matching Patients for the given data using the provided
blocking keys defined in the algorithm_pass. Also, get all the
remaining Patient records in the Person clusters identified in
blocking to calculate Belongingness Ratio.
"""
# Create the base query
base = expression.select(models.Patient.person_id).distinct()

# Build the join criteria, we are joining the Blocking Value table
# multiple times, once for each Blocking Key. If a Patient record
# has a matching Blocking Value for all the Blocking Keys, then it
# is considered a match.
for idx, key_id in enumerate(algorithm_pass.blocking_keys):
# get the BlockingKey obj from the id
if not hasattr(models.BlockingKey, key_id):
raise ValueError(f"No BlockingKey with id {id} found.")
key = getattr(models.BlockingKey, key_id)

# Get all the possible values from the data for this key
vals = [v for v in record.blocking_keys(key)]
# Create a dynamic alias for the Blocking Value table using the index
# this is necessary since we are potentially joining the same table
# multiple times with different conditions
alias = orm.aliased(models.BlockingValue, name=f"bv{idx}")
# Add a join clause to the mpi_blocking_value table for each Blocking Key.
# This results in multiple joins to the same table, one for each Key, but
# with different joining conditions.
base = base.join(
alias,
expression.and_(
models.Patient.id == alias.patient_id,
alias.blockingkey == key.id,
alias.value.in_(vals),
),
)

# Using the subquery of unique Patient IDs, select all the Patients
expr = expression.select(models.Patient).where(models.Patient.person_id.in_(base))
candidates = session.execute(expr).scalars().all()
return _filter_incorrect_blocks(record, candidates, algorithm_pass.blocking_keys)
return agree_count == len(self.blocking_values)

def __call__(
self, session: orm.Session, record: schemas.PIIRecord, algorithm_pass: models.AlgorithmPass
) -> typing.Sequence[models.Patient]:
"""
Get all of the matching Patients for the given data using the provided
blocking keys defined in the algorithm_pass. Also, get all the
remaining Patient records in the Person clusters identified in
blocking to calculate Belongingness Ratio.

:param session: The database session
:param record: The PIIRecord to match
:param algorithm_pass: The AlgorithmPass to use
:return: The matching Patients
"""
# Create the base query
base: expression.Select = expression.select(models.Patient.person_id).distinct()
# Get the pass kwargs or create an empty dict
kwargs: dict[str, typing.Any] = algorithm_pass.kwargs or {}

# Reset state before running
self._reset(kwargs)
# Build the join criteria, we are joining the Blocking Value table
# multiple times, once for each Blocking Key. If a Patient record
# has a matching Blocking Value for all the Blocking Keys, then it
# is considered a match.
for idx, key_id in enumerate(algorithm_pass.blocking_keys):
# get the BlockingKey obj from the id
if not hasattr(models.BlockingKey, key_id):
raise ValueError(f"No BlockingKey with id {id} found.")
key = getattr(models.BlockingKey, key_id)
# Get the log odds value for the key
log_odds: float = kwargs.get("log_odds", {}).get(key_id, 0.0)
# Keep track of total log odds, for checking skip conditions
self.total_odds += log_odds
# Get all the possible values from the data for this key
self.blocking_values[key] = [v for v in record.blocking_keys(key)]
if not self.blocking_values[key]:
# This blocking key doesn't have any possible values, so skip
# the joining query
continue
# This blocking key does have values, track the found log odds for checking skip conditions
self.found_odds += log_odds
# Create a dynamic alias for the Blocking Value table using the index
# this is necessary since we are potentially joining the same table
# multiple times with different conditions
alias = orm.aliased(models.BlockingValue, name=f"bv{idx}")
# Add a join clause to the mpi_blocking_value table for each Blocking Key.
# This results in multiple joins to the same table, one for each Key, but
# with different joining conditions.
base = base.join(
alias,
expression.and_(
models.Patient.id == alias.patient_id,
alias.blockingkey == key.id,
alias.value.in_(self.blocking_values[key]),
),
)

if self._check_skip_conditions():
# Too many missing blocking values
return []
Comment on lines +158 to +160
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be over-optimizing at this stage, so feel free to say this proposal is beyond the scope of this ticket.

Before this work, we never had to consider the order of our blocking keys because we didn’t have the option to skip blocking. Now that we do, it might make sense to re-evaluate the order since some blocking fields alone can determine whether we skip a pass—regardless of the other fields.

For example, in our first pass, we block on "BIRTHDATE", "IDENTIFIER", and "SEX". If an incoming record is missing "BIRTHDATE", and compare_minimum_percentage = 0.7, there's no point in checking "IDENTIFIER" or "SEX". Without "BIRTHDATE", the record can't exceed the threshold, so we already know it won’t pass.

Here are two possible ways to optimize this:

  1. Check _check_skip_conditions more frequently, possibly after each blocking key. Instead of calculating total_odds iteratively, we could get it upfront and track what’s already been evaluated. This way, we can exit the loop early if we know the remaining keys won’t push the record past compare_minimum_percentage.

  2. Reorder blocking keys to process the most important ones first. If we sort them in descending order by log odds, we can quickly determine if a record has too many missing values to proceed. For example, instead of our current order, we could run:

Pass 1: "BIRTHDATE", "SEX", and "IDENTIFIER"

  • An incoming record would exit the pass early if it doesn't have BIRTHDATE

Pass 2: "FIRST_NAME", "LAST_NAME", "ZIP", and "SEX"

  • A incoming record would exit the pass early if it doesn't have FIRST_NAME and LAST_NAME

Would love to hear your thoughts!


# Using the subquery of unique Patient IDs, select all the Patients
expr = expression.select(models.Patient).where(models.Patient.person_id.in_(base))
# Execute the query and collect all the Patients in matching Person clusters
patients: typing.Sequence[models.Patient] = session.execute(expr).scalars().all()
# Remove any Patient records that have incorrect blocking value matches
return [p for p in patients if self._filter_incorrect_match(p)]


def insert_patient(
3 changes: 2 additions & 1 deletion src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
@@ -114,7 +114,8 @@ def link_record_against_mpi(
with TRACER.start_as_current_span("link.block"):
# get all candidate Patient records identified in blocking
# and the remaining Patient records in their Person clusters
pats = mpi_service.get_block_data(session, record, algorithm_pass)
blocker = mpi_service.GetBlockData()
pats = blocker(session, record, algorithm_pass)
for pat in pats:
clusters[pat.person].append(pat)

80 changes: 63 additions & 17 deletions tests/unit/database/test_mpi_service.py
Original file line number Diff line number Diff line change
@@ -514,7 +514,7 @@ def prime_index(self, session: Session):
],
"birthdate": "1974-11-07",
},
person_2
person_2,
),
(
{
@@ -528,8 +528,8 @@ def prime_index(self, session: Session):
],
"birthdate": "1983-08-17",
},
person_2
)
person_2,
),
]
for datum, person in data:
mpi_service.insert_patient(session, schemas.PIIRecord(**datum), person=person)
@@ -549,7 +549,7 @@ def test_block_invalid_key(self, session: Session):
# passing in a invalid id of -1 for a blocking key which should raise a value error
algorithm_pass = models.AlgorithmPass(blocking_keys=["INVALID"])
with pytest.raises(ValueError):
mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)

def test_block_missing_data(self, session: Session, prime_index: None):
data = {
@@ -564,7 +564,7 @@ def test_block_missing_data(self, session: Session, prime_index: None):
]
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_empty_block_key(self, session: Session, prime_index: None):
@@ -580,7 +580,7 @@ def test_block_empty_block_key(self, session: Session, prime_index: None):
"birthdate": "",
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE", "FIRST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_filter_mpi_candidates(self, session: Session, prime_index: None):
@@ -604,7 +604,7 @@ def test_block_filter_mpi_candidates(self, session: Session, prime_index: None):
# Will initially be 3 patients in this person cluster
# One agrees on blocking, one has missing values, and one
# is wrong, so we should throw away that one
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 2

def test_block_on_birthdate(self, session: Session, prime_index: None):
@@ -622,7 +622,7 @@ def test_block_on_birthdate(self, session: Session, prime_index: None):
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE"])

matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 4
data = {
"name": [
@@ -636,7 +636,7 @@ def test_block_on_birthdate(self, session: Session, prime_index: None):
],
"birthdate": "11/12/1985",
}
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 1

def test_block_on_first_name(self, session: Session, prime_index: None):
@@ -653,7 +653,7 @@ def test_block_on_first_name(self, session: Session, prime_index: None):
"birthdate": "01/01/1980",
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["FIRST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
# One candidate in MPI person_1 is a Bill, will be ruled out
assert len(matches) == 4

@@ -671,7 +671,7 @@ def test_block_on_birthdate_and_first_name(self, session: Session, prime_index:
"birthdate": "01/01/1980",
}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE", "FIRST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
# One candidate in MPI person_1 is just a Bill, ruled out
assert len(matches) == 3

@@ -691,7 +691,7 @@ def test_block_on_birthdate_first_name_and_last_name(self, session: Session, pri
algorithm_pass = models.AlgorithmPass(
blocking_keys=["BIRTHDATE", "FIRST_NAME", "LAST_NAME"]
)
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
# One person in MPI person_1 is just a Bill, ruled out
assert len(matches) == 2
data = {
@@ -705,7 +705,7 @@ def test_block_on_birthdate_first_name_and_last_name(self, session: Session, pri
],
"birthdate": "Jan 1 1980",
}
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
# Blocking uses feature_iter, which yields only the first `given` for a
# single name object, so only the patient with 'Bill' is caught
assert len(matches) == 1
@@ -721,7 +721,53 @@ def test_block_on_birthdate_first_name_and_last_name(self, session: Session, pri
],
"birthdate": "Jan 1 1980",
}
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_missing_some_values(self, session: Session, prime_index: None):
data = {
"name": [
{
"given": [
"Johnathon",
"Bill",
],
"family": "",
}
],
"birthdate": "01/01/1980",
}
algorithm_pass = models.AlgorithmPass(
blocking_keys=["BIRTHDATE", "FIRST_NAME", "LAST_NAME"],
kwargs={
"log_odds": {"FIRST_NAME": 6.8, "LAST_NAME": 6.3, "BIRTHDATE": 10.1},
"compare_minimum_percentage": 0.7,
},
)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 3

def test_block_missing_too_many_values(self, session: Session, prime_index: None):
data = {
"name": [
{
"given": [
"Johnathon",
"Bill",
],
"family": "",
}
],
"birthdate": "01/01/1980",
}
algorithm_pass = models.AlgorithmPass(
blocking_keys=["BIRTHDATE", "FIRST_NAME", "LAST_NAME"],
kwargs={
"log_odds": {"FIRST_NAME": 6.8, "LAST_NAME": 6.3, "BIRTHDATE": 10.1},
"compare_minimum_percentage": 0.8,
},
)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_on_multiple_names(self, session: Session, prime_index: None):
@@ -739,14 +785,14 @@ def test_block_on_multiple_names(self, session: Session, prime_index: None):
rule="",
kwargs={},
)
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
# One of patients in MPI person_1 is a Bill, so is excluded
assert len(matches) == 4

def test_block_missing_keys(self, session: Session, prime_index: None):
data = {"birthdate": "01/01/1980"}
algorithm_pass = models.AlgorithmPass(blocking_keys=["BIRTHDATE", "LAST_NAME"])
matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 0

def test_block_on_duplicates(self, session: Session):
@@ -792,7 +838,7 @@ def test_block_on_duplicates(self, session: Session):
blocking_keys=["FIRST_NAME", "LAST_NAME", "ZIP", "SEX"]
)

matches = mpi_service.get_block_data(session, schemas.PIIRecord(**data), algorithm_pass)
matches = mpi_service.GetBlockData()(session, schemas.PIIRecord(**data), algorithm_pass)
assert len(matches) == 3