From 5b329394021ac0ba431ff101fc931c3ea9799c94 Mon Sep 17 00:00:00 2001 From: Sanjay Vasandani Date: Mon, 3 Feb 2025 11:58:20 -0800 Subject: [PATCH] perf: Avoid full table scan of Measurements in ActiveComputations query (#2044) --- .../spanner/queries/StreamMeasurements.kt | 29 +++++++++++++------ .../spanner/readers/MeasurementReader.kt | 20 ++++++------- .../resources/kingdom/spanner/changelog.yaml | 3 ++ ...ter-measurements-by-continuation-token.sql | 27 +++++++++++++++++ 4 files changed, 59 insertions(+), 20 deletions(-) create mode 100644 src/main/resources/kingdom/spanner/null-filter-measurements-by-continuation-token.sql diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt index 5054b89a911..fba1d8e761d 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/queries/StreamMeasurements.kt @@ -59,7 +59,15 @@ class StreamMeasurements( limit: Int, ): MeasurementReader { val orderByClause = getOrderByClause(view) - return MeasurementReader(view).apply { + val index: MeasurementReader.Index = + if (requiresExternalComputationId(view, requestFilter)) { + // This is a NULL_FILTERED sharded index, so it can only be used when + // ExternalComputationId is not null. + MeasurementReader.Index.CONTINUATION_TOKEN + } else { + MeasurementReader.Index.NONE + } + return MeasurementReader(view, index).apply { this.orderByClause = orderByClause fillStatementBuilder { appendWhereClause(view, requestFilter) @@ -72,6 +80,15 @@ class StreamMeasurements( } } + private fun requiresExternalComputationId( + view: Measurement.View, + requestFilter: StreamMeasurementsRequest.Filter, + ): Boolean { + return requestFilter.hasExternalComputationId || + view == Measurement.View.COMPUTATION || + view == Measurement.View.COMPUTATION_STATS + } + private fun getOrderByClause(view: Measurement.View): String { return when (view) { Measurement.View.COMPUTATION, @@ -90,12 +107,9 @@ class StreamMeasurements( ) { val conjuncts = mutableListOf() - if ( - filter.hasExternalComputationId || - view == Measurement.View.COMPUTATION || - view == Measurement.View.COMPUTATION_STATS - ) { + if (requiresExternalComputationId(view, filter)) { conjuncts.add("ExternalComputationId IS NOT NULL") + conjuncts.add("MeasurementIndexShardId >= 0") } if (filter.externalMeasurementConsumerId != 0L) { @@ -217,9 +231,6 @@ class StreamMeasurements( } } - // Include shard ID to use sharded index on UpdateTime appropriately. - conjuncts.add("MeasurementIndexShardId != -1") - if (conjuncts.isEmpty()) { return } diff --git a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt index c094bff7dfe..9d9d292eb9d 100644 --- a/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt +++ b/src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/readers/MeasurementReader.kt @@ -57,20 +57,18 @@ class MeasurementReader(private val view: Measurement.View, measurementsIndex: I enum class Index(internal val sql: String) { NONE(""), CREATE_REQUEST_ID("@{FORCE_INDEX=MeasurementsByCreateRequestId}"), + CONTINUATION_TOKEN("@{FORCE_INDEX=MeasurementsByContinuationToken}"), } - override val baseSql: String - get() = BASE_SQL - - private val BASE_SQL = + override val baseSql: String = + """ + @{spanner_emulator.disable_query_null_filtered_index_check=true} + WITH FilteredMeasurements AS ( + SELECT * + FROM + Measurements${measurementsIndex.sql} + JOIN MeasurementConsumers USING (MeasurementConsumerId) """ - @{spanner_emulator.disable_query_null_filtered_index_check=true} - WITH FilteredMeasurements AS ( - SELECT * - FROM - Measurements${measurementsIndex.sql} - JOIN MeasurementConsumers USING (MeasurementConsumerId) - """ .trimIndent() private var filled = false diff --git a/src/main/resources/kingdom/spanner/changelog.yaml b/src/main/resources/kingdom/spanner/changelog.yaml index 9df52627af1..02e59aa69f6 100644 --- a/src/main/resources/kingdom/spanner/changelog.yaml +++ b/src/main/resources/kingdom/spanner/changelog.yaml @@ -74,4 +74,7 @@ databaseChangeLog: relativeToChangeLogFile: true - include: file: shard-measurements-by-continuation-token.sql + relativeToChangeLogFile: true +- include: + file: null-filter-measurements-by-continuation-token.sql relativeToChangeLogFile: true \ No newline at end of file diff --git a/src/main/resources/kingdom/spanner/null-filter-measurements-by-continuation-token.sql b/src/main/resources/kingdom/spanner/null-filter-measurements-by-continuation-token.sql new file mode 100644 index 00000000000..915c730f220 --- /dev/null +++ b/src/main/resources/kingdom/spanner/null-filter-measurements-by-continuation-token.sql @@ -0,0 +1,27 @@ +-- liquibase formatted sql + +-- Copyright 2025 The Cross-Media Measurement Authors +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- changeset sanjayvas:22 dbms:cloudspanner +-- comment: Filter null computation IDs from MeasurementsByContinuationToken index. + +DROP INDEX MeasurementsByContinuationToken; + +CREATE NULL_FILTERED INDEX MeasurementsByContinuationToken ON Measurements( + MeasurementIndexShardId, + UpdateTime, + ExternalComputationId, + State +); \ No newline at end of file