-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathtest_cds_churn_with_traffic.py
137 lines (111 loc) · 5.28 KB
/
test_cds_churn_with_traffic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
"""@package integration_test.
Test with Cluster churn with active traffic.
"""
import pytest
import os
from dynamic_config_envoy_proxy import (inject_dynamic_envoy_http_proxy_fixture, proxy_config)
from benchmarks import utilities as benchmarks_utilities
from typing import Generator
from rules_python.python.runfiles import runfiles
from nighthawk.api.configuration import cluster_config_manager_pb2
from test.integration import utility
_BENCHMARK_DURATION = int(os.environ.get("NIGHTHAWK_BENCHMARK_DURATION", 30))
def _base_cds_config(
temp_dir: str) -> cluster_config_manager_pb2.DynamicClusterConfigManagerSettings:
settings = cluster_config_manager_pb2.DynamicClusterConfigManagerSettings()
settings.refresh_interval.seconds = 5
settings.output_file = os.path.join(temp_dir, 'new_cds.pb')
return settings
# TODO(kbaichoo): migrate to common test utility
def _assignEndpointsToClusters(
config: cluster_config_manager_pb2.DynamicClusterConfigManagerSettings, clusters: list[str],
available_endpoints: list[utility.SocketAddress]):
"""Process all backend endpoints, round robin assigning to the given clusters."""
for cluster in clusters:
new_cluster = config.clusters.add()
new_cluster.name = cluster
cluster_idx = 0
for endpoint in available_endpoints:
new_endpoint = config.clusters[cluster_idx].endpoints.add()
new_endpoint.ip = endpoint.ip
new_endpoint.port = endpoint.port
cluster_idx = (cluster_idx + 1) % len(clusters)
def _run_benchmark(fixture,
rps=500,
duration=_BENCHMARK_DURATION,
max_connections=1,
max_active_requests=100,
request_body_size=0,
response_size=1024,
concurrency=1,
request_source_config=None):
args = [
fixture.getTestServerRootUri(), "--rps",
str(rps), "--duration",
str(duration), "--connections",
str(max_connections), "--max-active-requests",
str(max_active_requests), "--concurrency",
str(concurrency), "--request-header",
"x-nighthawk-test-server-config:{response_body_size:%s}" % response_size,
"--experimental-h1-connection-reuse-strategy", "lru", "--prefetch-connections",
"--failure-predicate benchmark.http_3xx:4294967295",
"--failure-predicate benchmark.http_4xx:4294967295",
"--failure-predicate benchmark.http_5xx:4294967295",
"--failure-predicate benchmark.pool_connection_failure:4294967295"
]
if request_body_size > 0:
args.append("--request-body-size")
args.append(str(request_body_size))
if request_source_config:
args.append("--request-source-plugin-config")
args.append(_readRunfile(request_source_config))
parsed_json, _ = fixture.runNighthawkClient(args, check_return_code=False)
# output test results
benchmarks_utilities.output_benchmark_results(parsed_json, fixture)
def _readRunfile(path: str) -> str:
runfiles_instance = runfiles.Create()
with open(runfiles_instance.Rlocation(path)) as f:
return f.read()
def _config_generation_single_cluster(temp_dir: str, endpoints: list[utility.SocketAddress]):
"""Configure CDS churn for a single cluster."""
config = _base_cds_config(temp_dir)
clusters = ['service_envoyproxy_io']
_assignEndpointsToClusters(config, clusters, endpoints)
return config
@pytest.mark.parametrize('proxy_config',
["nighthawk/benchmarks/configurations/dynamic_resources.yaml"])
@pytest.mark.parametrize('server_config',
["nighthawk/test/integration/configurations/nighthawk_http_origin.yaml"])
@pytest.mark.parametrize('dynamic_config_generator', [_config_generation_single_cluster])
def test_dynamic_http_single_cluster_traffic(inject_dynamic_envoy_http_proxy_fixture,
proxy_config): # noqa
"""Dynamic HTTP test on a single cluster."""
_run_benchmark(inject_dynamic_envoy_http_proxy_fixture)
@pytest.fixture()
def request_source_config() -> Generator[str, None, None]:
"""Yield path to request_source_config for the nighthawk client to send different requests."""
yield "nighthawk/benchmarks/configurations/request_source_five_clusters.json"
def _config_generation_five_cluster(temp_dir: str, endpoints: list[utility.SocketAddress]):
"""Configure CDS churn for five clusters."""
config = _base_cds_config(temp_dir)
clusters = [
'cluster_one',
'cluster_two',
'cluster_three',
'cluster_four',
'cluster_five',
]
_assignEndpointsToClusters(config, clusters, endpoints)
return config
@pytest.mark.parametrize('proxy_config',
["nighthawk/benchmarks/configurations/dynamic_resources.yaml"])
@pytest.mark.parametrize(
'server_config',
["nighthawk/test/integration/configurations/nighthawk_15_listeners_http_origin.yaml"])
@pytest.mark.parametrize('dynamic_config_generator', [_config_generation_five_cluster])
def test_dynamic_http_multiple_cluster_traffic(inject_dynamic_envoy_http_proxy_fixture,
request_source_config, proxy_config):
"""Test that the nighthawkClient can run with request-source-plugin option."""
_run_benchmark(inject_dynamic_envoy_http_proxy_fixture,
request_source_config=request_source_config)