Skip to content

Commit

Permalink
perf: Avoid full table scan of Measurements in Requisitions query (#2041
Browse files Browse the repository at this point in the history
)

From testing on halo-cmm-dev, we see that ListRequisitions filtering by Measurement State and Requisition State results in a full table scan of the Measurements table. This is the exact call used by EDP simulators once per second.

In addition to making the query itself slow, this can result in locking all Measurement rows. This in turn can result in long lock waits for writes to the Measurements table.
  • Loading branch information
SanjayVas authored Jan 30, 2025
1 parent 0072cce commit ec6f333
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ class StreamRequisitions(requestFilter: StreamRequisitionsRequest.Filter, limit:

override val reader =
RequisitionReader().apply {
val orderByClause =
"ORDER BY UpdateTime ASC, ExternalDataProviderId ASC, ExternalRequisitionId ASC"
this.orderByClause = orderByClause

fillStatementBuilder {
appendWhereClause(requestFilter)
appendClause(orderByClause)
appendClause(ORDER_BY_CLAUSE)
if (limit > 0) {
appendClause("LIMIT @$LIMIT")
bind(LIMIT to limit.toLong())
Expand Down Expand Up @@ -97,6 +93,9 @@ class StreamRequisitions(requestFilter: StreamRequisitionsRequest.Filter, limit:
}

companion object {
private const val ORDER_BY_CLAUSE =
"ORDER BY UpdateTime ASC, ExternalDataProviderId ASC, ExternalRequisitionId ASC"

const val LIMIT = "limit"
const val EXTERNAL_MEASUREMENT_CONSUMER_ID = "externalMeasurementConsumerId"
const val EXTERNAL_MEASUREMENT_ID = "externalMeasurementId"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.wfanet.measurement.kingdom.deploy.gcloud.spanner.readers

import com.google.cloud.spanner.Statement
import com.google.cloud.spanner.Struct
import kotlinx.coroutines.flow.singleOrNull
import org.wfanet.measurement.common.identity.InternalId
Expand Down Expand Up @@ -43,15 +42,6 @@ class RequisitionReader : SpannerReader<RequisitionReader.Result>() {
override val baseSql: String
get() = BASE_SQL

private var filled = false

/** Optional ORDER BY clause that is appended at the end of the overall query. */
var orderByClause: String? = null
set(value) {
check(!filled) { "Statement builder already filled" }
field = value
}

override suspend fun translate(struct: Struct): Result {
return Result(
InternalId(struct.getLong("MeasurementConsumerId")),
Expand All @@ -62,28 +52,6 @@ class RequisitionReader : SpannerReader<RequisitionReader.Result>() {
)
}

/**
* Fills the statement builder for the query.
*
* @param block a function for filling the statement builder for the Requisitions table subquery.
*/
override fun fillStatementBuilder(block: Statement.Builder.() -> Unit): SpannerReader<Result> {
check(!filled) { "Statement builder already filled" }
filled = true

return super.fillStatementBuilder {
block()
append(")\n")

appendClause(SUBQUERY_SQL)

val orderByClause = orderByClause
if (orderByClause != null) {
appendClause(orderByClause)
}
}
}

suspend fun readByExternalDataProviderId(
readContext: AsyncDatabaseClient.ReadContext,
externalDataProviderId: Long,
Expand Down Expand Up @@ -135,86 +103,62 @@ class RequisitionReader : SpannerReader<RequisitionReader.Result>() {

private val BASE_SQL =
"""
@{spanner_emulator.disable_query_null_filtered_index_check=true}
WITH FilteredRequisitions AS (
SELECT
Requisitions.MeasurementConsumerId,
Requisitions.MeasurementId,
Requisitions.RequisitionId,
Requisitions.UpdateTime,
Requisitions.ExternalRequisitionId,
Requisitions.State,
Requisitions.FulfillingDuchyId,
Requisitions.RequisitionDetails,
Requisitions.DataProviderCertificateId,
ExternalMeasurementConsumerId,
ExternalMeasurementId,
ExternalDataProviderId,
Measurements.State AS MeasurementState,
CreateTime,
ExternalComputationId,
CertificateId,
MeasurementDetails
FROM
MeasurementConsumers JOIN Measurements USING (MeasurementConsumerId)
JOIN Requisitions USING (MeasurementConsumerId, MeasurementId)
JOIN DataProviders USING (DataProviderId)
"""
.trimIndent()

private val SUBQUERY_SQL =
"""
SELECT
MeasurementConsumerId,
MeasurementId,
RequisitionId,
FilteredRequisitions.UpdateTime,
ExternalRequisitionId,
FilteredRequisitions.State AS RequisitionState,
FulfillingDuchyId,
RequisitionDetails,
ExternalMeasurementId,
ExternalMeasurementConsumerId,
ExternalMeasurementConsumerCertificateId,
ExternalComputationId,
ExternalDataProviderId,
ExternalDataProviderCertificateId,
SubjectKeyIdentifier,
NotValidBefore,
NotValidAfter,
RevocationState,
CertificateDetails,
MeasurementState,
MeasurementDetails,
FilteredRequisitions.CreateTime,
(
@{spanner_emulator.disable_query_null_filtered_index_check=TRUE}
SELECT
count(ExternalDataProviderId),
MeasurementConsumerId,
MeasurementId,
RequisitionId,
Requisitions.UpdateTime,
ExternalRequisitionId,
Requisitions.State AS RequisitionState,
FulfillingDuchyId,
RequisitionDetails,
ExternalMeasurementId,
ExternalMeasurementConsumerId,
ExternalMeasurementConsumerCertificateId,
ExternalComputationId,
ExternalDataProviderId,
ExternalDataProviderCertificateId,
SubjectKeyIdentifier,
NotValidBefore,
NotValidAfter,
RevocationState,
CertificateDetails,
Measurements.State AS MeasurementState,
MeasurementDetails,
Measurements.CreateTime,
(
SELECT
COUNT(DataProviderId),
FROM
Requisitions
WHERE
Requisitions.MeasurementConsumerId = Measurements.MeasurementConsumerId
AND Requisitions.MeasurementId = Measurements.MeasurementId
) AS MeasurementRequisitionCount,
ARRAY(
SELECT AS STRUCT
ComputationParticipants.DuchyId,
ComputationParticipants.ParticipantDetails,
ExternalDuchyCertificateId
FROM
ComputationParticipants
LEFT JOIN DuchyCertificates USING (DuchyId, CertificateId)
WHERE
ComputationParticipants.MeasurementConsumerId = Requisitions.MeasurementConsumerId
AND ComputationParticipants.MeasurementId = Requisitions.MeasurementId
) AS ComputationParticipants
FROM
Requisitions
WHERE
Requisitions.MeasurementConsumerId = FilteredRequisitions.MeasurementConsumerId
AND Requisitions.MeasurementId = FilteredRequisitions.MeasurementId
) AS MeasurementRequisitionCount,
ARRAY(
SELECT AS STRUCT
ComputationParticipants.DuchyId,
ComputationParticipants.ParticipantDetails,
ExternalDuchyCertificateId
FROM
ComputationParticipants
LEFT JOIN DuchyCertificates USING (DuchyId, CertificateId)
WHERE
ComputationParticipants.MeasurementConsumerId = FilteredRequisitions.MeasurementConsumerId
AND ComputationParticipants.MeasurementId = FilteredRequisitions.MeasurementId
) AS ComputationParticipants
FROM
FilteredRequisitions
JOIN MeasurementConsumerCertificates USING (MeasurementConsumerId, CertificateId)
JOIN DataProviderCertificates
ON (DataProviderCertificates.CertificateId = FilteredRequisitions.DataProviderCertificateId)
JOIN Certificates ON (Certificates.CertificateId = DataProviderCertificates.CertificateId)
"""
JOIN DataProviders USING (DataProviderId)
JOIN Measurements USING (MeasurementConsumerId, MeasurementId)
JOIN MeasurementConsumers USING (MeasurementConsumerId)
JOIN MeasurementConsumerCertificates USING (MeasurementConsumerId, CertificateId)
JOIN DataProviderCertificates ON (
DataProviderCertificates.CertificateId = Requisitions.DataProviderCertificateId
)
JOIN Certificates ON (Certificates.CertificateId = DataProviderCertificates.CertificateId)
"""
.trimIndent()

/** Builds a [Requisition] from [struct]. */
Expand Down

0 comments on commit ec6f333

Please sign in to comment.