Skip to content

Commit 2865fb2

Browse files
Implemented possibility for configuring traffic splitting, and fallback using aggregate cluster #292
1 parent cdf8d9c commit 2865fb2

File tree

5 files changed

+40
-87
lines changed

5 files changed

+40
-87
lines changed

envoy-control-core/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/snapshot/resource/clusters/EnvoyClustersFactory.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ class EnvoyClustersFactory(
188188
return emptyList()
189189
}
190190

191-
// todo AD refactor
191+
// todo refactor
192192
private fun getEdsClustersForGroup(group: Group, globalSnapshot: GlobalSnapshot): List<Cluster> {
193193
val clusters: Map<String, Cluster> = if (enableTlsForGroup(group)) {
194194
globalSnapshot.securedClusters
@@ -217,7 +217,7 @@ class EnvoyClustersFactory(
217217
} else group.proxySettings.outgoing.defaultServiceSettings
218218
createClusters(
219219
group.serviceName,
220-
dependencies.keys,
220+
globalSnapshot.allServicesNames,
221221
dependencySettings,
222222
clusters[it],
223223
globalSnapshot.endpoints[it]

envoy-control-core/src/test/kotlin/pl/allegro/tech/servicemesh/envoycontrol/EnvoySnapshotFactoryTest.kt

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import io.envoyproxy.envoy.config.listener.v3.Listener
1515
import io.micrometer.core.instrument.MeterRegistry
1616
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
1717
import org.assertj.core.api.Assertions.assertThat
18-
import org.junit.Ignore
1918
import org.junit.jupiter.api.Test
2019
import pl.allegro.tech.servicemesh.envoycontrol.groups.AccessLogFilterSettings
2120
import pl.allegro.tech.servicemesh.envoycontrol.groups.AllServicesGroup
@@ -59,7 +58,7 @@ class EnvoySnapshotFactoryTest {
5958
const val CLUSTER_NAME = "cluster-name"
6059
const val MAIN_CLUSTER_NAME = "service-name-2"
6160
const val SECONDARY_CLUSTER_NAME = "service-name-2-secondary"
62-
const val AGGREGATE_CLUSTER_NAME = "service-name-2-aggr"
61+
const val AGGREGATE_CLUSTER_NAME = "service-name-2-aggregate"
6362
const val CLUSTER_NAME_2 = "cluster-name-2"
6463
const val DEFAULT_SERVICE_NAME = "service-name"
6564
const val SERVICE_NAME_2 = "service-name-2"
@@ -252,7 +251,7 @@ class EnvoySnapshotFactoryTest {
252251
val cluster2 =
253252
createCluster(snapshotPropertiesWithWeights, serviceName = SERVICE_NAME_2, clusterName = SERVICE_NAME_2)
254253
val group: Group = createServicesGroup(
255-
dependencies = arrayOf(cluster2.name to outgoingTimeoutPolicy(connectionIdleTimeout = 10)),
254+
dependencies = arrayOf(cluster2.name to null),
256255
snapshotProperties = snapshotPropertiesWithWeights
257256
)
258257
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
@@ -309,7 +308,6 @@ class EnvoySnapshotFactoryTest {
309308
}
310309

311310
@Test
312-
@Ignore // todo AD
313311
fun `should create weighted snapshot clusters for wildcard dependencies`() {
314312
// given
315313
val envoySnapshotFactory = createSnapshotFactory(snapshotPropertiesWithWeights)
@@ -318,9 +316,10 @@ class EnvoySnapshotFactoryTest {
318316
createCluster(snapshotPropertiesWithWeights, serviceName = SERVICE_NAME_2, clusterName = SERVICE_NAME_2)
319317
val wildcardTimeoutPolicy = outgoingTimeoutPolicy(connectionIdleTimeout = 12)
320318

321-
val group: Group = createServicesGroup(
319+
val group: Group = createAllServicesGroup(
322320
dependencies = arrayOf("*" to wildcardTimeoutPolicy),
323-
snapshotProperties = snapshotPropertiesWithWeights
321+
snapshotProperties = snapshotPropertiesWithWeights,
322+
defaultServiceSettings = DependencySettings(),
324323
)
325324
val globalSnapshot = createGlobalSnapshot(cluster1, cluster2)
326325

@@ -334,22 +333,6 @@ class EnvoySnapshotFactoryTest {
334333
.containsKey(AGGREGATE_CLUSTER_NAME)
335334
}
336335

337-
@Test
338-
fun `should create weighted secondary snapshot cluster with filtered endpoints`() {
339-
}
340-
341-
@Test
342-
fun `should create weighted main snapshot cluster with default endpoints`() {
343-
}
344-
345-
@Test
346-
fun `should create weighted main snapshot cluster with custom connection idle timeout`() {
347-
}
348-
349-
@Test
350-
fun `should create weighted secondary snapshot cluster with custom connection idle timeout`() {
351-
}
352-
353336
@Test
354337
fun `should fetch ratelimit service endpoint if there are global rate limits`() {
355338
// given

envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/config/envoy/EnvoyAdmin.kt

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ class EnvoyAdmin(
6969
}
7070
}
7171

72-
fun configDump(): String {
73-
val response = get("config_dump", mapOf("include_eds" to "on"))
72+
private fun configDump(): String {
73+
val response = get("config_dump")
7474
return response.body.use { it!!.string() }
7575
}
7676

@@ -108,15 +108,11 @@ class EnvoyAdmin(
108108
private val client = OkHttpClient.Builder()
109109
.build()
110110

111-
private fun get(path: String, queryParams: Map<String, String> = mapOf()): Response {
112-
val params = queryParams.entries
113-
.joinToString(prefix = "?", separator = "&") {
114-
"${it.key}=${it.value}"
115-
}
111+
private fun get(path: String): Response {
116112
return client.newCall(
117113
Request.Builder()
118114
.get()
119-
.url("$address/$path$params")
115+
.url("$address/$path")
120116
.build()
121117
)
122118
.execute().addToCloseableResponses()

envoy-control-tests/src/main/kotlin/pl/allegro/tech/servicemesh/envoycontrol/trafficsplitting/AggregateClusterFallbackTest.kt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package pl.allegro.tech.servicemesh.envoycontrol.trafficsplitting
33
import TrafficSplittingConstants.serviceName
44
import TrafficSplittingConstants.upstreamServiceName
55
import callUpstreamServiceRepeatedly
6+
import org.assertj.core.api.Assertions
67
import org.junit.jupiter.api.Test
78
import org.junit.jupiter.api.extension.RegisterExtension
8-
import pl.allegro.tech.servicemesh.envoycontrol.config.Xds
9+
import pl.allegro.tech.servicemesh.envoycontrol.assertions.untilAsserted
910
import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulClusterSetup
1011
import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulMultiClusterExtension
1112
import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension
1213
import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension
14+
import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoContainer
1315
import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension
1416
import verifyCallsCountCloseTo
1517
import verifyIsReachable
@@ -18,7 +20,6 @@ import java.time.Duration
1820

1921
class AggregateClusterFallbackTest {
2022
companion object {
21-
private const val numberOfCalls = 100
2223
private const val forceTrafficZone = "dc3"
2324
private val properties = mapOf(
2425
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
@@ -45,17 +46,6 @@ class AggregateClusterFallbackTest {
4546
)
4647
)
4748

48-
private val echo2Config = """
49-
node:
50-
metadata:
51-
proxy_settings:
52-
outgoing:
53-
dependencies:
54-
- service: "service-1"
55-
""".trimIndent()
56-
57-
private val config = Xds.copy(configOverride = echo2Config, serviceName = "echo2")
58-
5949
@JvmField
6050
@RegisterExtension
6151
val consul = ConsulMultiClusterExtension()
@@ -75,7 +65,6 @@ class AggregateClusterFallbackTest {
7565
val envoyControl3 =
7666
EnvoyControlClusteredExtension(consul.serverThird, { properties }, listOf(consul))
7767

78-
7968
@JvmField
8069
@RegisterExtension
8170
val upstreamServiceDC1 = EchoServiceExtension()
@@ -132,5 +121,16 @@ class AggregateClusterFallbackTest {
132121
}
133122
}
134123

124+
private fun EnvoyExtension.assertInstancesUp(vararg containers: EchoContainer) {
125+
untilAsserted {
126+
val addresses = this.container.admin()
127+
.endpointsAddress(clusterName = upstreamServiceName)
128+
.map { "${it.address}:${it.portValue}" }
129+
Assertions.assertThat(addresses)
130+
.hasSize(containers.size)
131+
.containsExactlyInAnyOrderElementsOf(containers.map { it.address() })
132+
}
133+
}
134+
135135
}
136136

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package pl.allegro.tech.servicemesh.envoycontrol.trafficsplitting
22

3+
import TrafficSplittingConstants.serviceName
4+
import TrafficSplittingConstants.upstreamServiceName
5+
import callUpstreamServiceRepeatedly
36
import org.junit.jupiter.api.Test
47
import org.junit.jupiter.api.extension.RegisterExtension
58
import pl.allegro.tech.servicemesh.envoycontrol.config.Xds
69
import pl.allegro.tech.servicemesh.envoycontrol.config.consul.ConsulMultiClusterExtension
7-
import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.CallStats
810
import pl.allegro.tech.servicemesh.envoycontrol.config.envoy.EnvoyExtension
911
import pl.allegro.tech.servicemesh.envoycontrol.config.envoycontrol.EnvoyControlClusteredExtension
1012
import pl.allegro.tech.servicemesh.envoycontrol.config.service.EchoServiceExtension
@@ -15,12 +17,14 @@ import java.time.Duration
1517

1618
class WeightedClustersRoutingTest {
1719
companion object {
18-
private const val serviceName = "echo2"
19-
private const val upstreamServiceName = "service-1"
20-
private const val numberOfCalls = 100
2120
private const val forceTrafficZone = "dc2"
2221

23-
private val priorityProps = mapOf(
22+
private val properties = mapOf(
23+
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
24+
"envoy-control.sync.enabled" to true,
25+
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.zoneName" to forceTrafficZone,
26+
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties"
27+
to mutableMapOf(serviceName to mutableMapOf("main" to 90, "secondary" to 10)),
2428
"envoy-control.envoy.snapshot.loadBalancing.priorities.zonePriorities" to mapOf(
2529
"dc1" to mapOf(
2630
"dc1" to 0,
@@ -33,14 +37,6 @@ class WeightedClustersRoutingTest {
3337
)
3438
)
3539

36-
private val properties = mapOf(
37-
"envoy-control.envoy.snapshot.stateSampleDuration" to Duration.ofSeconds(0),
38-
"envoy-control.sync.enabled" to true,
39-
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.zoneName" to forceTrafficZone,
40-
"envoy-control.envoy.snapshot.loadBalancing.trafficSplitting.serviceByWeightsProperties"
41-
to mutableMapOf(serviceName to mutableMapOf("main" to 90, "secondary" to 10))
42-
)
43-
4440
private val echo2Config = """
4541
node:
4642
metadata:
@@ -54,25 +50,17 @@ class WeightedClustersRoutingTest {
5450

5551
@JvmField
5652
@RegisterExtension
57-
val consulClusters = ConsulMultiClusterExtension()
53+
val consul = ConsulMultiClusterExtension()
5854

5955
@JvmField
6056
@RegisterExtension
6157
val envoyControl =
62-
EnvoyControlClusteredExtension(
63-
consulClusters.serverFirst,
64-
{ properties + priorityProps },
65-
listOf(consulClusters)
66-
)
58+
EnvoyControlClusteredExtension(consul.serverFirst, { properties }, listOf(consul))
6759

6860
@JvmField
6961
@RegisterExtension
7062
val envoyControl2 =
71-
EnvoyControlClusteredExtension(
72-
consulClusters.serverSecond,
73-
{ properties + priorityProps },
74-
listOf(consulClusters)
75-
)
63+
EnvoyControlClusteredExtension(consul.serverSecond, { properties }, listOf(consul))
7664

7765
@JvmField
7866
@RegisterExtension
@@ -97,32 +85,18 @@ class WeightedClustersRoutingTest {
9785

9886
@Test
9987
fun `should route traffic according to weights`() {
100-
consulClusters.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)
88+
consul.serverFirst.operations.registerServiceWithEnvoyOnEgress(echoEnvoyDC1, name = serviceName)
10189

102-
consulClusters.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName)
90+
consul.serverFirst.operations.registerService(upstreamServiceDC1, name = upstreamServiceName)
10391
echoEnvoyDC1.verifyIsReachable(upstreamServiceDC1, upstreamServiceName)
10492

105-
consulClusters.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName)
93+
consul.serverSecond.operations.registerService(upstreamServiceDC2, name = upstreamServiceName)
10694
envoyDC2.verifyIsReachable(upstreamServiceDC2, upstreamServiceName)
10795

10896
echoEnvoyDC1.callUpstreamServiceRepeatedly(upstreamServiceDC1, upstreamServiceDC2)
10997
.verifyCallsCountCloseTo(upstreamServiceDC1, 90)
11098
.verifyCallsCountGreaterThan(upstreamServiceDC2, 1)
11199
}
112100

113-
fun EnvoyExtension.callUpstreamServiceRepeatedly(
114-
vararg services: EchoServiceExtension
115-
): CallStats {
116-
val stats = CallStats(services.asList())
117-
this.egressOperations.callServiceRepeatedly(
118-
service = upstreamServiceName,
119-
stats = stats,
120-
minRepeat = numberOfCalls,
121-
maxRepeat = numberOfCalls,
122-
repeatUntil = { true },
123-
headers = mapOf()
124-
)
125-
return stats
126-
}
127101
}
128102

0 commit comments

Comments
 (0)