Skip to content

Commit

Permalink
Implemented possibility for configuring traffic splitting, and fallba…
Browse files Browse the repository at this point in the history
…ck using aggregate cluster #292
  • Loading branch information
nastassia-dailidava committed Aug 24, 2023
1 parent 08f5a33 commit cdf8d9c
Show file tree
Hide file tree
Showing 16 changed files with 937 additions and 59 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.19.36]

### Changed
- Implemented configuring traffic splitting and fallback using aggregate cluster functionality

## [0.19.35]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.orDefault
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
Expand All @@ -38,6 +39,7 @@ class EnvoySnapshotFactory(

companion object {
const val DEFAULT_HTTP_PORT = 80
private val logger by logger()
}

fun newSnapshot(
Expand Down Expand Up @@ -156,24 +158,33 @@ class EnvoySnapshotFactory(
return newSnapshotForGroup
}

private fun getDomainRouteSpecifications(group: Group): Map<DomainRoutesGrouper, Collection<RouteSpecification>> {
private fun getDomainRouteSpecifications(
group: Group,
globalSnapshot: GlobalSnapshot
): Map<DomainRoutesGrouper, Collection<RouteSpecification>> {
return group.proxySettings.outgoing.getDomainDependencies().groupBy(
{ DomainRoutesGrouper(it.getPort(), it.useSsl()) },
{
RouteSpecification(
clusterName = it.getClusterName(),
routeDomains = listOf(it.getRouteDomain()),
settings = it.settings
settings = it.settings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it.getClusterName())
)
}
)
}

private fun getDomainPatternRouteSpecifications(group: Group): RouteSpecification {
private fun getDomainPatternRouteSpecifications(group: Group, globalSnapshot: GlobalSnapshot): RouteSpecification {
return RouteSpecification(
clusterName = properties.dynamicForwardProxy.clusterName,
routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern },
settings = group.proxySettings.outgoing.defaultServiceSettings
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = getTrafficSplittingWeights(
group.serviceName,
globalSnapshot,
properties.dynamicForwardProxy.clusterName
)
)
}

Expand All @@ -185,7 +196,8 @@ class EnvoySnapshotFactory(
RouteSpecification(
clusterName = it.service,
routeDomains = listOf(it.service) + getServiceWithCustomDomain(it.service),
settings = it.settings
settings = it.settings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it.service)
)
}
return when (group) {
Expand All @@ -198,14 +210,28 @@ class EnvoySnapshotFactory(
RouteSpecification(
clusterName = it,
routeDomains = listOf(it) + getServiceWithCustomDomain(it),
settings = group.proxySettings.outgoing.defaultServiceSettings
settings = group.proxySettings.outgoing.defaultServiceSettings,
clusterWeights = getTrafficSplittingWeights(group.serviceName, globalSnapshot, it)
)
}
allServicesRoutes + definedServicesRoutes
}
}
}

private fun getTrafficSplittingWeights(
serviceName: String,
globalSnapshot: GlobalSnapshot,
dependencyServiceName: String
): Map<String, Int> {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName] ?: mapOf()
val enabledForDependency = globalSnapshot.endpoints[dependencyServiceName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (enabledForDependency) weights else mapOf()
}

private fun getServiceWithCustomDomain(it: String): List<String> {
return if (properties.egress.domains.isNotEmpty()) {
properties.egress.domains.map { domain -> "$it$domain" }
Expand All @@ -226,27 +252,31 @@ class EnvoySnapshotFactory(
// endpointsFactory.filterEndpoints() can use this cache to prevent computing the same
// ClusterLoadAssignments many times - it may reduce MEM, CPU and latency if some serviceTags are
// commonly used
endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
routeSpec.clusterName to endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
}
}
}.toMap()

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }

return egressLoadAssignments + rateLimitLoadAssignments
val secondaryLoadAssignments = endpointsFactory.getSecondaryClusterEndpoints(
egressLoadAssignments,
egressRouteSpecifications
)
return egressLoadAssignments.values.toList() + rateLimitLoadAssignments + secondaryLoadAssignments
}

private fun newSnapshotForGroup(
group: Group,
globalSnapshot: GlobalSnapshot
): Snapshot {

logger.debug("Creating new snapshot for group ${group.serviceName}")
// TODO(dj): This is where serious refactoring needs to be done
val egressDomainRouteSpecifications = getDomainRouteSpecifications(group)
val egressDomainRouteSpecifications = getDomainRouteSpecifications(group, globalSnapshot)
val egressServiceRouteSpecification = getServiceRouteSpecifications(group, globalSnapshot)
val egressRouteSpecification = egressServiceRouteSpecification +
egressDomainRouteSpecifications.values.flatten().toSet() + getDomainPatternRouteSpecifications(group)
egressDomainRouteSpecifications.values.flatten().toSet() +
getDomainPatternRouteSpecifications(group, globalSnapshot)

val clusters: List<Cluster> =
clustersFactory.getClustersForGroup(group, globalSnapshot)
Expand All @@ -272,7 +302,6 @@ class EnvoySnapshotFactory(
)
)
}

val listeners = if (properties.dynamicListeners.enabled) {
listenersFactory.createListeners(group, globalSnapshot)
} else {
Expand All @@ -281,11 +310,13 @@ class EnvoySnapshotFactory(

// TODO(dj): endpoints depends on prerequisite of routes -> but only to extract clusterName,
// which is present only in services (not domains) so it could be implemented differently.
val endpoints = getServicesEndpointsForGroup(group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot,
egressRouteSpecification)
val endpoints = getServicesEndpointsForGroup(
group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot,
egressRouteSpecification
)
// .also { logger.debug("Result endpoints: ${it.toString()}") }

val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes)

return createSnapshot(
clusters = clusters,
clustersVersion = version.clusters,
Expand All @@ -296,7 +327,9 @@ class EnvoySnapshotFactory(
listenersVersion = version.listeners,
routes = routes,
routesVersion = version.routes
)
).also {
logger.debug("Snapshot endpoints: ${it.endpoints()}")
}
}

private fun createRoutesWhenUsingTransparentProxy(
Expand Down Expand Up @@ -375,5 +408,6 @@ data class ClusterConfiguration(
class RouteSpecification(
val clusterName: String,
val routeDomains: List<String>,
val settings: DependencySettings
val settings: DependencySettings,
val clusterWeights: Map<String, Int> = mapOf()
)
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class LoadBalancingProperties {
var regularMetadataKey = "lb_regular"
var localityMetadataKey = "locality"
var weights = LoadBalancingWeightsProperties()
var trafficSplitting = TrafficSplittingProperties()
var policy = Cluster.LbPolicy.LEAST_REQUEST
var useKeysSubsetFallbackPolicy = true
var priorities = LoadBalancingPriorityProperties()
Expand All @@ -154,6 +155,11 @@ class CanaryProperties {
var headerValue = "1"
}

class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, Map<String, Int>> = mapOf()
}

class LoadBalancingWeightsProperties {
var enabled = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ import pl.allegro.tech.servicemesh.envoycontrol.snapshot.OAuthProvider
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.SnapshotProperties
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.Threshold
import pl.allegro.tech.servicemesh.envoycontrol.snapshot.resource.listeners.filters.SanUriMatcherFactory
import pl.allegro.tech.servicemesh.envoycontrol.utils.getAggregateClusterName
import pl.allegro.tech.servicemesh.envoycontrol.utils.getSecondaryClusterName

typealias EnvoyClusterConfig = io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig

class EnvoyClustersFactory(
private val properties: SnapshotProperties
Expand Down Expand Up @@ -184,27 +188,41 @@ class EnvoyClustersFactory(
return emptyList()
}

// todo AD refactor
private fun getEdsClustersForGroup(group: Group, globalSnapshot: GlobalSnapshot): List<Cluster> {
val clusters: Map<String, Cluster> = if (enableTlsForGroup(group)) {
globalSnapshot.securedClusters
} else {
globalSnapshot.clusters
}

val serviceDependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service }

val dependencies = group.proxySettings.outgoing.getServiceDependencies().associateBy { it.service }
val clustersForGroup = when (group) {
is ServicesGroup -> serviceDependencies.mapNotNull {
createClusterForGroup(it.value.settings, clusters[it.key])
is ServicesGroup -> dependencies.flatMap {
createClusters(
group.serviceName,
dependencies.keys,
it.value.settings,
clusters[it.key],
globalSnapshot.endpoints[it.key]
)
}

is AllServicesGroup -> {
globalSnapshot.allServicesNames.mapNotNull {
val dependency = serviceDependencies[it]
if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) {
createClusterForGroup(dependency.settings, clusters[it])
} else {
createClusterForGroup(group.proxySettings.outgoing.defaultServiceSettings, clusters[it])
}
globalSnapshot.allServicesNames.flatMap {
val dependency = dependencies[it]
val dependencySettings =
if (dependency != null && dependency.settings.timeoutPolicy.connectionIdleTimeout != null) {
dependency.settings
} else group.proxySettings.outgoing.defaultServiceSettings
createClusters(
group.serviceName,
dependencies.keys,
dependencySettings,
clusters[it],
globalSnapshot.endpoints[it]
)

}
}
}
Expand All @@ -215,15 +233,60 @@ class EnvoyClustersFactory(
return clustersForGroup
}

private fun createClusterForGroup(dependencySettings: DependencySettings, cluster: Cluster?): Cluster? {
private fun createClusterForGroup(dependencySettings: DependencySettings, cluster: Cluster): Cluster {
val idleTimeoutPolicy =
dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout
return Cluster.newBuilder(cluster)
.setCommonHttpProtocolOptions(
HttpProtocolOptions.newBuilder().setIdleTimeout(idleTimeoutPolicy)
).build()
}

private fun createSetOfClustersForGroup(
dependencySettings: DependencySettings,
cluster: Cluster
): Collection<Cluster> {
val mainCluster = createClusterForGroup(dependencySettings, cluster)
val secondaryCluster = createClusterForGroup(dependencySettings, cluster)
.toBuilder()
.setName(getSecondaryClusterName(cluster.name))
.build()
val aggregateCluster =
createAggregateCluster(mainCluster.name, linkedSetOf(secondaryCluster.name, mainCluster.name))
return linkedSetOf(mainCluster, secondaryCluster, aggregateCluster)
.also { logger.debug("Created traffic splitting clusters: {}", it) }
}

private fun createClusters(
serviceName: String,
dependencies: Set<String>,
dependencySettings: DependencySettings,
cluster: Cluster?,
clusterLoadAssignment: ClusterLoadAssignment?
): Collection<Cluster> {
return cluster?.let {
val idleTimeoutPolicy =
dependencySettings.timeoutPolicy.connectionIdleTimeout ?: cluster.commonHttpProtocolOptions.idleTimeout
Cluster.newBuilder(cluster)
.setCommonHttpProtocolOptions(
HttpProtocolOptions.newBuilder().setIdleTimeout(idleTimeoutPolicy)
).build()
}
if (enableTrafficSplitting(serviceName, cluster.name, dependencies, clusterLoadAssignment)) {
logger.debug("Creating traffic splitting egress cluster config for ${cluster.name}, service: $serviceName")
createSetOfClustersForGroup(dependencySettings, cluster)
} else {
listOf(createClusterForGroup(dependencySettings, cluster))
}
} ?: listOf()
}

private fun enableTrafficSplitting(
serviceName: String,
clusterName: String,
dependencies: Set<String>,
clusterLoadAssignment: ClusterLoadAssignment?
): Boolean {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val trafficSplitEnabled = trafficSplitting.serviceByWeightsProperties.keys.contains(serviceName)
&& dependencies.contains(clusterName)
val hasEndpointsInZone = clusterLoadAssignment
?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone } ?: false
return trafficSplitEnabled && hasEndpointsInZone
}

private fun shouldAddDynamicForwardProxyCluster(group: Group) =
Expand Down Expand Up @@ -267,6 +330,7 @@ class EnvoyClustersFactory(
.build()
}

// TODO ??? todo AD
private fun getStrictDnsClustersForGroup(group: Group): List<Cluster> {
val useTransparentProxy = group.listenersConfig?.useTransparentProxy ?: false
return group.proxySettings.outgoing.getDomainDependencies().map {
Expand All @@ -277,6 +341,25 @@ class EnvoyClustersFactory(
}
}

private fun createAggregateCluster(clusterName: String, aggregatedClusters: Collection<String>): Cluster {
return Cluster.newBuilder()
.setName(getAggregateClusterName(clusterName))
.setConnectTimeout(Durations.fromMillis(properties.edsConnectionTimeout.toMillis()))
.setLbPolicy(Cluster.LbPolicy.CLUSTER_PROVIDED)
.setClusterType(
Cluster.CustomClusterType.newBuilder()
.setName("envoy.clusters.aggregate")
.setTypedConfig(
Any.pack(
EnvoyClusterConfig.newBuilder()
.addAllClusters(aggregatedClusters)
.build()
)
)
)
.build()
}

private fun strictDnsCluster(
domainDependency: DomainDependency,
useTransparentProxy: Boolean
Expand Down Expand Up @@ -372,6 +455,7 @@ class EnvoyClustersFactory(
ADS -> ConfigSource.newBuilder()
.setResourceApiVersion(ApiVersion.V3)
.setAds(AggregatedConfigSource.newBuilder())

XDS ->
ConfigSource.newBuilder()
.setResourceApiVersion(ApiVersion.V3)
Expand All @@ -393,7 +477,7 @@ class EnvoyClustersFactory(
)
)
}
).setServiceName(clusterConfiguration.serviceName)
)
)
.setLbPolicy(properties.loadBalancing.policy)
// TODO: if we want to have multiple memory-backend instances of ratelimit
Expand Down
Loading

0 comments on commit cdf8d9c

Please sign in to comment.