Skip to content

Commit

Permalink
Implemented possibility of traffic splitting configuration (#397)
Browse files Browse the repository at this point in the history
* Implemented possibility for configuring traffic splitting, and fallback using aggregate cluster #292
---------

Co-authored-by: nastassia.dailidava <[email protected]>
Co-authored-by: Kamil Smigielski <[email protected]>
  • Loading branch information
3 people authored Oct 4, 2023
1 parent 689078b commit 2b63f78
Show file tree
Hide file tree
Showing 20 changed files with 1,092 additions and 115 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

Lists all changes with user impact.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
## [0.20.00]

## [0.20.1]

### Changed
- Implemented configuring traffic splitting and fallback using aggregate cluster functionality

## [0.20.0]

### Changed
- Spring Boot upgraded to 3.1.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import pl.allegro.tech.servicemesh.envoycontrol.groups.Group
import pl.allegro.tech.servicemesh.envoycontrol.groups.IncomingRateLimitEndpoint
import pl.allegro.tech.servicemesh.envoycontrol.groups.ServicesGroup
import pl.allegro.tech.servicemesh.envoycontrol.groups.orDefault
import pl.allegro.tech.servicemesh.envoycontrol.logger
import pl.allegro.tech.servicemesh.envoycontrol.services.MultiClusterState
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstance
import pl.allegro.tech.servicemesh.envoycontrol.services.ServiceInstances
Expand All @@ -38,6 +39,7 @@ class EnvoySnapshotFactory(

companion object {
const val DEFAULT_HTTP_PORT = 80
private val logger by logger()
}

fun newSnapshot(
Expand Down Expand Up @@ -111,6 +113,7 @@ class EnvoySnapshotFactory(
val removedClusters = previous - current.keys
current + removedClusters
}

false -> current
}
}
Expand Down Expand Up @@ -156,24 +159,26 @@ class EnvoySnapshotFactory(
return newSnapshotForGroup
}

private fun getDomainRouteSpecifications(group: Group): Map<DomainRoutesGrouper, Collection<RouteSpecification>> {
private fun getDomainRouteSpecifications(
group: Group
): Map<DomainRoutesGrouper, Collection<RouteSpecification>> {
return group.proxySettings.outgoing.getDomainDependencies().groupBy(
{ DomainRoutesGrouper(it.getPort(), it.useSsl()) },
{
RouteSpecification(
StandardRouteSpecification(
clusterName = it.getClusterName(),
routeDomains = listOf(it.getRouteDomain()),
settings = it.settings
settings = it.settings,
)
}
)
}

private fun getDomainPatternRouteSpecifications(group: Group): RouteSpecification {
return RouteSpecification(
return StandardRouteSpecification(
clusterName = properties.dynamicForwardProxy.clusterName,
routeDomains = group.proxySettings.outgoing.getDomainPatternDependencies().map { it.domainPattern },
settings = group.proxySettings.outgoing.defaultServiceSettings
settings = group.proxySettings.outgoing.defaultServiceSettings,
)
}

Expand All @@ -182,30 +187,67 @@ class EnvoySnapshotFactory(
globalSnapshot: GlobalSnapshot
): Collection<RouteSpecification> {
val definedServicesRoutes = group.proxySettings.outgoing.getServiceDependencies().map {
RouteSpecification(
buildRouteSpecification(
clusterName = it.service,
routeDomains = listOf(it.service) + getServiceWithCustomDomain(it.service),
settings = it.settings
settings = it.settings,
group.serviceName,
globalSnapshot
)
}
return when (group) {
is ServicesGroup -> {
definedServicesRoutes
}

is AllServicesGroup -> {
val servicesNames = group.proxySettings.outgoing.getServiceDependencies().map { it.service }.toSet()
val allServicesRoutes = globalSnapshot.allServicesNames.subtract(servicesNames).map {
RouteSpecification(
buildRouteSpecification(
clusterName = it,
routeDomains = listOf(it) + getServiceWithCustomDomain(it),
settings = group.proxySettings.outgoing.defaultServiceSettings
settings = group.proxySettings.outgoing.defaultServiceSettings,
group.serviceName,
globalSnapshot
)
}
allServicesRoutes + definedServicesRoutes
}
}
}

private fun buildRouteSpecification(
clusterName: String,
routeDomains: List<String>,
settings: DependencySettings,
serviceName: String,
globalSnapshot: GlobalSnapshot,
): RouteSpecification {
val trafficSplitting = properties.loadBalancing.trafficSplitting
val weights = trafficSplitting.serviceByWeightsProperties[serviceName]
val enabledForDependency = globalSnapshot.endpoints[clusterName]?.endpointsList
?.any { e -> trafficSplitting.zoneName == e.locality.zone }
?: false
return if (weights != null && enabledForDependency) {
logger.debug(
"Building traffic splitting route spec, weights: $weights, " +
"serviceName: $serviceName, clusterName: $clusterName, "
)
WeightRouteSpecification(
clusterName,
routeDomains,
settings,
weights
)
} else {
StandardRouteSpecification(
clusterName,
routeDomains,
settings
)
}
}

private fun getServiceWithCustomDomain(it: String): List<String> {
return if (properties.egress.domains.isNotEmpty()) {
properties.egress.domains.map { domain -> "$it$domain" }
Expand All @@ -217,7 +259,7 @@ class EnvoySnapshotFactory(
private fun getServicesEndpointsForGroup(
rateLimitEndpoints: List<IncomingRateLimitEndpoint>,
globalSnapshot: GlobalSnapshot,
egressRouteSpecifications: Collection<RouteSpecification>
egressRouteSpecifications: List<RouteSpecification>
): List<ClusterLoadAssignment> {
val egressLoadAssignments = egressRouteSpecifications.mapNotNull { routeSpec ->
globalSnapshot.endpoints[routeSpec.clusterName]?.let { endpoints ->
Expand All @@ -226,27 +268,30 @@ class EnvoySnapshotFactory(
// endpointsFactory.filterEndpoints() can use this cache to prevent computing the same
// ClusterLoadAssignments many times - it may reduce MEM, CPU and latency if some serviceTags are
// commonly used
endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
routeSpec.clusterName to endpointsFactory.filterEndpoints(endpoints, routeSpec.settings.routingPolicy)
}
}
}.toMap()

val rateLimitClusters =
if (rateLimitEndpoints.isNotEmpty()) listOf(properties.rateLimit.serviceName) else emptyList()
val rateLimitLoadAssignments = rateLimitClusters.mapNotNull { name -> globalSnapshot.endpoints[name] }

return egressLoadAssignments + rateLimitLoadAssignments
val secondaryLoadAssignments = endpointsFactory.getSecondaryClusterEndpoints(
egressLoadAssignments,
egressRouteSpecifications
)
return egressLoadAssignments.values.toList() + rateLimitLoadAssignments + secondaryLoadAssignments
}

private fun newSnapshotForGroup(
group: Group,
globalSnapshot: GlobalSnapshot
): Snapshot {

// TODO(dj): This is where serious refactoring needs to be done
val egressDomainRouteSpecifications = getDomainRouteSpecifications(group)
val egressServiceRouteSpecification = getServiceRouteSpecifications(group, globalSnapshot)
val egressRouteSpecification = egressServiceRouteSpecification +
egressDomainRouteSpecifications.values.flatten().toSet() + getDomainPatternRouteSpecifications(group)
egressDomainRouteSpecifications.values.flatten().toSet() +
getDomainPatternRouteSpecifications(group)

val clusters: List<Cluster> =
clustersFactory.getClustersForGroup(group, globalSnapshot)
Expand All @@ -272,7 +317,6 @@ class EnvoySnapshotFactory(
)
)
}

val listeners = if (properties.dynamicListeners.enabled) {
listenersFactory.createListeners(group, globalSnapshot)
} else {
Expand All @@ -281,11 +325,12 @@ class EnvoySnapshotFactory(

// TODO(dj): endpoints depends on prerequisite of routes -> but only to extract clusterName,
// which is present only in services (not domains) so it could be implemented differently.
val endpoints = getServicesEndpointsForGroup(group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot,
egressRouteSpecification)
val endpoints = getServicesEndpointsForGroup(
group.proxySettings.incoming.rateLimitEndpoints, globalSnapshot,
egressRouteSpecification
)

val version = snapshotsVersions.version(group, clusters, endpoints, listeners, routes)

return createSnapshot(
clusters = clusters,
clustersVersion = version.clusters,
Expand Down Expand Up @@ -372,8 +417,21 @@ data class ClusterConfiguration(
val http2Enabled: Boolean
)

class RouteSpecification(
val clusterName: String,
val routeDomains: List<String>,
val settings: DependencySettings
)
sealed class RouteSpecification {
abstract val clusterName: String
abstract val routeDomains: List<String>
abstract val settings: DependencySettings
}

data class StandardRouteSpecification(
override val clusterName: String,
override val routeDomains: List<String>,
override val settings: DependencySettings,
) : RouteSpecification()

data class WeightRouteSpecification(
override val clusterName: String,
override val routeDomains: List<String>,
override val settings: DependencySettings,
val clusterWeights: ZoneWeights,
) : RouteSpecification()
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class LoadBalancingProperties {
var regularMetadataKey = "lb_regular"
var localityMetadataKey = "locality"
var weights = LoadBalancingWeightsProperties()
var trafficSplitting = TrafficSplittingProperties()
var policy = Cluster.LbPolicy.LEAST_REQUEST
var useKeysSubsetFallbackPolicy = true
var priorities = LoadBalancingPriorityProperties()
Expand All @@ -154,6 +155,18 @@ class CanaryProperties {
var headerValue = "1"
}

class TrafficSplittingProperties {
var zoneName = ""
var serviceByWeightsProperties: Map<String, ZoneWeights> = mapOf()
var secondaryClusterPostfix = "secondary"
var aggregateClusterPostfix = "aggregate"
}

class ZoneWeights {
var main = 100
var secondary = 0
}

class LoadBalancingWeightsProperties {
var enabled = false
}
Expand Down
Loading

0 comments on commit 2b63f78

Please sign in to comment.