Skip to content

Commit

Permalink
Implemented possibility for configuring traffic splitting, and fallba…
Browse files Browse the repository at this point in the history
…ck using aggregate cluster| Added tests and fixed review issues #292
  • Loading branch information
nastassia-dailidava committed Aug 25, 2023
1 parent fd999eb commit ec4bb94
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ class EnvoySnapshotFactory(
}

private fun getDomainRouteSpecifications(
group: Group,
globalSnapshot: GlobalSnapshot
group: Group
): Map<DomainRoutesGrouper, Collection<RouteSpecification>> {
return group.proxySettings.outgoing.getDomainDependencies().groupBy(
{ DomainRoutesGrouper(it.getPort(), it.useSsl()) },
Expand All @@ -169,22 +168,18 @@ class EnvoySnapshotFactory(
clusterName = it.getClusterName(),
routeDomains = listOf(it.getRouteDomain()),
settings = it.settings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it.getClusterName())
clusterWeights = ClusterWeights()
)
}
)
}

private fun getDomainPatternRouteSpecifications(group: Group, globalSnapshot: GlobalSnapshot): RouteSpecification {
private fun getDomainPatternRouteSpecifications(group: Group): RouteSpecification {
return RouteSpecification(
clusterName = properties.dynamicForwardProxy.clusterName,
routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern },
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = getTrafficSplittingWeights(
group.serviceName,
globalSnapshot,
properties.dynamicForwardProxy.clusterName
)
clusterWeights = ClusterWeights()
)
}

Expand Down Expand Up @@ -223,13 +218,13 @@ class EnvoySnapshotFactory(
serviceName: String,
globalSnapshot: GlobalSnapshot,
dependencyServiceName: String
): Map<String, Int> {
): ClusterWeights {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName] ?: mapOf()
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val enabledForDependency = globalSnapshot.endpoints[dependencyServiceName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (enabledForDependency) weights else mapOf()
return if (enabledForDependency && weights != null) weights else ClusterWeights()
}

private fun getServiceWithCustomDomain(it: String): List<String> {
Expand Down Expand Up @@ -271,11 +266,11 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot
): Snapshot {
// TODO(dj): This is where serious refactoring needs to be done
val egressDomainRouteSpecifications = getDomainRouteSpecifications(group, globalSnapshot)
val egressDomainRouteSpecifications = getDomainRouteSpecifications(group)
val egressServiceRouteSpecification = getServiceRouteSpecifications(group, globalSnapshot)
val egressRouteSpecification = egressServiceRouteSpecification +
egressDomainRouteSpecifications.values.flatten().toSet() +
getDomainPatternRouteSpecifications(group, globalSnapshot)
getDomainPatternRouteSpecifications(group)

val clusters: List<Cluster> =
clustersFactory.getClustersForGroup(group, globalSnapshot)
Expand Down Expand Up @@ -405,5 +400,5 @@ class RouteSpecification(
val clusterName: String,
val routeDomains: List<String>,
val settings: DependencySettings,
val clusterWeights: Map<String, Int> = mapOf()
val clusterWeights: ClusterWeights = ClusterWeights()
)
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ class CanaryProperties {

class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, Map<String, Int>> = mapOf()
var serviceByWeightsProperties: Map<String, ClusterWeights> = mapOf()
}

class ClusterWeights {
var mainClusterWeight: Int = 0
var secondaryClusterWeight: Int = 0
}

class LoadBalancingWeightsProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.CommunicationMode.XDS
import pl.allegro.tech.servicemesh.envoycontrol.groups.DependencySettings
import pl.allegro.tech.servicemesh.envoycontrol.groups.DomainDependency
import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServiceDependency
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.containsGlobalRateLimits
import pl.allegro.tech.servicemesh.envoycontrol.logger
Expand All @@ -52,8 +53,6 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory
import pl.allegro.tech.servicemesh.envoycontrol.utils.getAggregateClusterName
import pl.allegro.tech.servicemesh.envoycontrol.utils.getSecondaryClusterName

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

Expand All @@ -79,6 +78,16 @@ class EnvoyClustersFactory(

companion object {
private val logger by logger()
const val SECONDARY_CLUSTER_POSTFIX = "secondary"
const val AGGREGATE_CLUSTER_POSTFIX = "aggregate"

fun getSecondaryClusterName(serviceName: String): String {
return "$serviceName-$SECONDARY_CLUSTER_POSTFIX"
}

fun getAggregateClusterName(serviceName: String): String {
return "$serviceName-$AGGREGATE_CLUSTER_POSTFIX"
}
}

fun getClustersForServices(
Expand Down Expand Up @@ -188,7 +197,6 @@ class EnvoyClustersFactory(
return emptyList()
}

// todo refactor
private fun getEdsClustersForGroup(group: Group, globalSnapshot: GlobalSnapshot): List<Cluster> {
val clusters: Map<String, Cluster> = if (enableTlsForGroup(group)) {
globalSnapshot.securedClusters
Expand All @@ -211,14 +219,10 @@ class EnvoyClustersFactory(
is AllServicesGroup -> {
globalSnapshot.allServicesNames.flatMap {
val dependency = dependencies[it]
val dependencySettings =
if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) {
dependency.settings
} else group.proxySettings.outgoing.defaultServiceSettings
createClusters(
group.serviceName,
globalSnapshot.allServicesNames,
dependencySettings,
getDependencySettings(dependency, group),
clusters[it],
globalSnapshot.endpoints[it]
)
Expand All @@ -232,6 +236,12 @@ class EnvoyClustersFactory(
return clustersForGroup
}

private fun getDependencySettings(dependency: ServiceDependency?, group: Group): DependencySettings {
return if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) {
dependency.settings
} else group.proxySettings.outgoing.defaultServiceSettings
}

private fun createClusterForGroup(dependencySettings: DependencySettings, cluster: Cluster): Cluster {
val idleTimeoutPolicy =
dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout
Expand All @@ -251,8 +261,8 @@ class EnvoyClustersFactory(
.setName(getSecondaryClusterName(cluster.name))
.build()
val aggregateCluster =
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return linkedSetOf(mainCluster, secondaryCluster, aggregateCluster)
createAggregateCluster(mainCluster.name, listOf(secondaryCluster.name, mainCluster.name))
return listOf(mainCluster, secondaryCluster, aggregateCluster)
.also { logger.debug("Created traffic splitting clusters: {}", it) }
}

Expand Down Expand Up @@ -282,7 +292,7 @@ class EnvoyClustersFactory(
clusterLoadAssignment: ClusterLoadAssignment?
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.keys.contains(serviceName) &&
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.containsKey(serviceName) &&
dependencies.contains(clusterName)
val hasEndpointsInZone = clusterLoadAssignment?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone } ?: false
Expand Down Expand Up @@ -330,7 +340,6 @@ class EnvoyClustersFactory(
.build()
}

// TODO ??? todo AD
private fun getStrictDnsClustersForGroup(group: Group): List<Cluster> {
val useTransparentProxy = group.listenersConfig?.useTransparentProxy ?: false
return group.proxySettings.outgoing.getDomainDependencies().map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.routes.ServiceTagMetadataGenerator
import pl.allegro.tech.servicemesh.envoycontrol.utils.getSecondaryClusterName

typealias EnvoyProxyLocality = io.envoyproxy.envoy.config.core.v3.Locality

Expand Down Expand Up @@ -81,13 +81,13 @@ class EnvoyEndpointsFactory(
egressRouteSpecifications: Collection<RouteSpecification>
): List<ClusterLoadAssignment> {
return egressRouteSpecifications
.filter { it.clusterWeights.isNotEmpty() }
.filter { it.clusterWeights.mainClusterWeight > 0 && it.clusterWeights.secondaryClusterWeight > 0 }
.onEach { logger.debug("Traffic splitting is enabled for cluster: ${it.clusterName}") }
.mapNotNull { routeSpec ->
clusterLoadAssignments[routeSpec.clusterName]?.let {
ClusterLoadAssignment.newBuilder(it)
clusterLoadAssignments[routeSpec.clusterName]?.let { assignment ->
ClusterLoadAssignment.newBuilder(assignment)
.clearEndpoints()
.addAllEndpoints(it.endpointsList?.filter { e ->
.addAllEndpoints(assignment.endpointsList?.filter { e ->
e.locality.zone == properties.loadBalancing.trafficSplitting.zoneName
})
.setClusterName(getSecondaryClusterName(routeSpec.clusterName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryHostPredicate
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.RouteSpecification
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.clusters.EnvoyClustersFactory.Companion.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.ServiceTagFilterFactory
import pl.allegro.tech.servicemesh.envoycontrol.utils.getSecondaryClusterName
import pl.allegro.tech.servicemesh.envoycontrol.utils.withClusterWeight
import pl.allegro.tech.servicemesh.envoycontrol.groups.RetryPolicy as EnvoyControlRetryPolicy

Expand Down Expand Up @@ -346,21 +346,23 @@ class EnvoyEgressRoutesFactory(
}

private fun RouteAction.Builder.setCluster(routeSpec: RouteSpecification): RouteAction.Builder {
val hasWeightsConfig = routeSpec.clusterWeights.keys.containsAll(listOf("main", "secondary"))
val clusterWeights = routeSpec.clusterWeights
val hasWeightsConfig = clusterWeights.mainClusterWeight > 0 &&
clusterWeights.secondaryClusterWeight > 0
return if (!hasWeightsConfig) {
this.setCluster(routeSpec.clusterName)
} else {
logger.debug(
"Creating weighted cluster configuration for route spec {}, {}",
routeSpec.clusterName,
routeSpec.clusterWeights
clusterWeights
)
this.setWeightedClusters(
WeightedCluster.newBuilder()
.withClusterWeight(routeSpec.clusterName, routeSpec.clusterWeights["main"]!!)
.withClusterWeight(routeSpec.clusterName, clusterWeights.mainClusterWeight)
.withClusterWeight(
getSecondaryClusterName(routeSpec.clusterName),
routeSpec.clusterWeights["secondary"]!!
clusterWeights.secondaryClusterWeight
)
)
}
Expand Down

This file was deleted.

Loading

0 comments on commit ec4bb94

Please sign in to comment.