Skip to content

Commit

Permalink
Add ForwardedStorageCorrectnessTest. (#936)
Browse files Browse the repository at this point in the history
This allows running the K8s correctness test against a running CMMS instance, with the following conditions:
* The instance uses the testing certificates/keys from the codebase.
* The EDP simulator sketches are accessible via a server which implements the ForwardedStorage service.
  • Loading branch information
SanjayVas authored Apr 19, 2023
1 parent 2b7ce89 commit ae25637
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 1 deletion.
30 changes: 30 additions & 0 deletions src/main/k8s/local/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ src/main/k8s/local/resource_setup.sh \
--output-dir=/tmp/resource-setup
```

Note: You can stop the port forwarding at this point. Future steps involve
restarting some Deployments, rendering the forwarding invalid.

Tip: The job will output a `resource-setup.bazelrc` file with `--define` options
that you can include in your `.bazelrc` file. You can then specify
`--config=halo-local` to Bazel commands instead of those individual options.
Expand Down Expand Up @@ -257,6 +260,33 @@ browser at http://localhost:31111/:
kubectl port-forward prometheus-pod 31111:9090
```

## Running the Correctness Test

Once you have a running CMMS with EDP simulators, you can run the correctness
test against it.

You'll need access to the public API and forwarded storage servers. You can do
this via port forwarding:

```shell
kubectl port-forward --address=localhost services/v2alpha-public-api-server 8443:8443
```

```shell
kubectl port-forward --address=localhost services/fake-storage-server 7443:8443 &
```

Then you can run the test, substituting your own values:

```shell
bazel test //src/test/kotlin/org/wfanet/measurement/integration/k8s:ForwardedStorageCorrectnessTest
--test_output=streamed \
--define=kingdom_public_api_target=localhost:8443 \
--define=forwarded_storage_api_target=localhost:7443 \
--define=mc_name=measurementConsumers/Rcn7fKd25C8 \
--define=mc_api_key=W9q4zad246g
```

## Old Guide

This has instructions that may be outdated.
Expand Down
47 changes: 47 additions & 0 deletions src/main/proto/wfa/measurement/integration/k8s/testing/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
load("@rules_proto//proto:defs.bzl", "proto_library")
load("@rules_java//java:defs.bzl", "java_proto_library")
load("@wfa_common_jvm//build/kt_jvm_proto:defs.bzl", "kt_jvm_proto_library")

package(
default_testonly = True,
default_visibility = ["//src/test/kotlin/org/wfanet/measurement/integration/k8s:__pkg__"],
)

IMPORT_PREFIX = "/src/main/proto"

proto_library(
name = "correctness_test_config_proto",
srcs = ["correctness_test_config.proto"],
strip_import_prefix = IMPORT_PREFIX,
deps = [
"@com_google_googleapis//google/api:resource_proto",
],
)

java_proto_library(
name = "correctness_test_config_java_proto",
deps = [":correctness_test_config_proto"],
)

kt_jvm_proto_library(
name = "correctness_test_config_kt_jvm_proto",
srcs = [":correctness_test_config_proto"],
deps = [":correctness_test_config_java_proto"],
)

proto_library(
name = "forwarded_storage_config_proto",
srcs = ["forwarded_storage_config.proto"],
strip_import_prefix = IMPORT_PREFIX,
)

java_proto_library(
name = "forwarded_storage_config_java_proto",
deps = [":forwarded_storage_config_proto"],
)

kt_jvm_proto_library(
name = "forwarded_storage_config_kt_jvm_proto",
srcs = [":forwarded_storage_config_proto"],
deps = [":forwarded_storage_config_java_proto"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package wfa.measurement.integration.k8s.testing;

import "google/api/resource.proto";

option java_package = "org.measurement.integration.k8s.testing";
option java_multiple_files = true;

message CorrectnessTestConfig {
// gRPC target of Kingdom public API server.
string kingdom_public_api_target = 1;

// Expected hostname (DNS-ID) in the Kingdom public API server's TLS
// certificate.
//
// If not specified, standard TLS DNS-ID derivation will be used.
string kingdom_public_api_cert_host = 2;

// MeasurementConsumer resource name.
string measurement_consumer = 3 [(google.api.resource_reference).type =
"halo.wfanet.org/MeasurementConsumer"];

// Authentication key for the CMMS public API.
string api_authentication_key = 4;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 The Cross-Media Measurement Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package wfa.measurement.integration.k8s.testing;

option java_package = "org.measurement.integration.k8s.testing";
option java_multiple_files = true;

message ForwardedStorageConfig {
// gRPC target of the forwarded storage API server.
string forwarded_storage_api_target = 1;

// Expected hostname (DNS-ID) in the forwarded storage API server's TLS
// certificate.
//
// If not specified, standard TLS DNS-ID derivation will be used.
string forwarded_storage_api_cert_host = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class AbstractCorrectnessTest(private val measurementSystem: Measuremen
testHarness.executeDuration("$runId-duration")
}

@Test(timeout = 8 * 60 * 1000)
@Test(timeout = 10 * 60 * 1000)
fun `reach and frequency measurement completes with expected result`() = runBlocking {
testHarness.executeReachAndFrequency("$runId-reach-and-freq")
}
Expand Down
54 changes: 54 additions & 0 deletions src/test/kotlin/org/wfanet/measurement/integration/k8s/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
load("@io_bazel_rules_kotlin//kotlin:jvm.bzl", "kt_jvm_library")
load("@rules_java//java:defs.bzl", "java_test")
load("@wfa_common_jvm//build:defs.bzl", "expand_template")
load("//build:variables.bzl", "TEST_K8S_SETTINGS")

package(default_testonly = True)

Expand Down Expand Up @@ -45,6 +47,46 @@ kt_jvm_library(
],
)

kt_jvm_library(
name = "forwarded_storage_correctness_test",
srcs = ["ForwardedStorageCorrectnessTest.kt"],
deps = [
":abstract_correctness_test",
"//src/main/kotlin/org/wfanet/measurement/loadtest/config:event_filters",
"//src/main/proto/wfa/measurement/integration/k8s/testing:correctness_test_config_kt_jvm_proto",
"//src/main/proto/wfa/measurement/integration/k8s/testing:forwarded_storage_config_kt_jvm_proto",
"@wfa_common_jvm//imports/java/org/junit",
"@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/grpc",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/testing",
"@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/storage/forwarded",
],
)

expand_template(
name = "gen_correctness_test_config",
out = "correctness_test_config.textproto",
substitutions = {
"{kingdom_public_api_target}": "$(kingdom_public_api_target)",
"{kingdom_public_api_cert_host}": "localhost",
"{mc_name}": TEST_K8S_SETTINGS.mc_name,
"{mc_api_key}": TEST_K8S_SETTINGS.mc_api_key,
},
tags = ["manual"],
template = "correctness_test_config.tmpl.textproto",
)

expand_template(
name = "gen_forwarded_storage_config",
out = "forwarded_storage_config.textproto",
substitutions = {
"{forwarded_storage_api_target}": "$(forwarded_storage_api_target)",
"{forwarded_storage_api_cert_host}": "localhost",
},
tags = ["manual"],
template = "forwarded_storage_config.tmpl.textproto",
)

java_test(
name = "EmptyClusterCorrectnessTest",
size = "enormous",
Expand All @@ -65,3 +107,15 @@ java_test(
test_class = "org.wfanet.measurement.integration.k8s.EmptyClusterCorrectnessTest",
runtime_deps = [":empty_cluster_correctness_test"],
)

java_test(
name = "ForwardedStorageCorrectnessTest",
timeout = "long",
data = [
":correctness_test_config.textproto",
":forwarded_storage_config.textproto",
],
tags = ["manual"],
test_class = "org.wfanet.measurement.integration.k8s.ForwardedStorageCorrectnessTest",
runtime_deps = [":forwarded_storage_correctness_test"],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2023 The Cross-Media Measurement Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.wfanet.measurement.integration.k8s

import io.grpc.ManagedChannel
import java.nio.file.Paths
import java.time.Duration
import java.util.UUID
import org.junit.ClassRule
import org.junit.rules.TemporaryFolder
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
import org.measurement.integration.k8s.testing.CorrectnessTestConfig
import org.measurement.integration.k8s.testing.ForwardedStorageConfig
import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt
import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt
import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt
import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt
import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt
import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt
import org.wfanet.measurement.common.grpc.buildMutualTlsChannel
import org.wfanet.measurement.common.grpc.withDefaultDeadline
import org.wfanet.measurement.common.parseTextProto
import org.wfanet.measurement.common.testing.chainRulesSequentially
import org.wfanet.measurement.internal.testing.ForwardedStorageGrpcKt
import org.wfanet.measurement.loadtest.config.EventFilters
import org.wfanet.measurement.loadtest.frontend.FrontendSimulator
import org.wfanet.measurement.loadtest.frontend.MeasurementConsumerData
import org.wfanet.measurement.loadtest.storage.SketchStore
import org.wfanet.measurement.storage.forwarded.ForwardedStorageClient

/**
* Test for correctness of an existing CMMS on Kubernetes where the EDP simulator sketches are
* accessible via a [ForwardedStorageClient].
*
* This currently assumes that the CMMS instance is using the certificates and keys from this Bazel
* workspace.
*/
class ForwardedStorageCorrectnessTest : AbstractCorrectnessTest(measurementSystem) {
private class RunningMeasurementSystem : MeasurementSystem, TestRule {
override val runId: String by lazy { UUID.randomUUID().toString() }

private lateinit var _testHarness: FrontendSimulator
override val testHarness: FrontendSimulator
get() = _testHarness

private val channels = mutableListOf<ManagedChannel>()

override fun apply(base: Statement, description: Description): Statement {
return object : Statement() {
override fun evaluate() {
try {
_testHarness = createTestHarness()
base.evaluate()
} finally {
shutDownChannels()
}
}
}
}

private fun createTestHarness(): FrontendSimulator {
val measurementConsumerData =
MeasurementConsumerData(
TEST_CONFIG.measurementConsumer,
MC_SIGNING_KEY,
MC_ENCRYPTION_PRIVATE_KEY,
TEST_CONFIG.apiAuthenticationKey
)

val forwardedStorageChannel =
buildMutualTlsChannel(
STORAGE_CONFIG.forwardedStorageApiTarget,
KINGDOM_SIGNING_CERTS,
STORAGE_CONFIG.forwardedStorageApiCertHost.ifEmpty { null }
)
.also { channels.add(it) }
.withDefaultDeadline(RPC_DEADLINE_DURATION)

val publicApiChannel =
buildMutualTlsChannel(
TEST_CONFIG.kingdomPublicApiTarget,
MEASUREMENT_CONSUMER_SIGNING_CERTS,
TEST_CONFIG.kingdomPublicApiCertHost.ifEmpty { null }
)
.also { channels.add(it) }
.withDefaultDeadline(RPC_DEADLINE_DURATION)

val storageClient =
ForwardedStorageClient(
ForwardedStorageGrpcKt.ForwardedStorageCoroutineStub(forwardedStorageChannel)
)
return FrontendSimulator(
measurementConsumerData,
OUTPUT_DP_PARAMS,
DataProvidersGrpcKt.DataProvidersCoroutineStub(publicApiChannel),
EventGroupsGrpcKt.EventGroupsCoroutineStub(publicApiChannel),
MeasurementsGrpcKt.MeasurementsCoroutineStub(publicApiChannel),
RequisitionsGrpcKt.RequisitionsCoroutineStub(publicApiChannel),
MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub(publicApiChannel),
CertificatesGrpcKt.CertificatesCoroutineStub(publicApiChannel),
SketchStore(storageClient),
RESULT_POLLING_DELAY,
MEASUREMENT_CONSUMER_SIGNING_CERTS.trustedCertificates,
EventFilters.EVENT_TEMPLATES_TO_FILTERS_MAP
)
}

private fun shutDownChannels() {
for (channel in channels) {
channel.shutdown()
}
}
}

companion object {
private val RESULT_POLLING_DELAY = Duration.ofSeconds(10)
private val RPC_DEADLINE_DURATION = Duration.ofSeconds(30)
private val CONFIG_PATH =
Paths.get("src", "test", "kotlin", "org", "wfanet", "measurement", "integration", "k8s")
private const val TEST_CONFIG_NAME = "correctness_test_config.textproto"
private const val STORAGE_CONFIG_NAME = "forwarded_storage_config.textproto"

private val TEST_CONFIG: CorrectnessTestConfig by lazy {
val configFile = getRuntimePath(CONFIG_PATH.resolve(TEST_CONFIG_NAME)).toFile()
parseTextProto(configFile, CorrectnessTestConfig.getDefaultInstance())
}

private val STORAGE_CONFIG: ForwardedStorageConfig by lazy {
val configFile = getRuntimePath(CONFIG_PATH.resolve(STORAGE_CONFIG_NAME)).toFile()
parseTextProto(configFile, ForwardedStorageConfig.getDefaultInstance())
}

private val tempDir = TemporaryFolder()
private val measurementSystem = RunningMeasurementSystem()

@ClassRule @JvmField val chainedRule = chainRulesSequentially(tempDir, measurementSystem)
}
}
Loading

0 comments on commit ae25637

Please sign in to comment.