Skip to content

Commit

Permalink
JAMES-3693 Add rateLimiterTimeout configuration for RateLimiter mai…
Browse files Browse the repository at this point in the history
…lets
  • Loading branch information
vttranlina authored and Arsnael committed May 8, 2024
1 parent 2ba5430 commit b7488ac
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ package org.apache.james.transport.mailets

import java.time.Duration
import java.util
import com.google.common.collect.ImmutableList

import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableList
import jakarta.inject.Inject
import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
import org.apache.mailet.base.GenericMailet
import org.apache.mailet.{Mail, ProcessingState}
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}

import scala.jdk.DurationConverters._

case class GlobalKey(keyPrefix: Option[KeyPrefix], entityType: EntityType) extends RateLimitingKey {
override val asString: String = {
val key = s"${entityType.asString}_global}"
Expand Down Expand Up @@ -79,6 +82,7 @@ object GlobalRateLimiter {
* <li><b>size</b>: Size of emails allowed for all users during duration (each email count one time, regardless of recipient count). Optional, if unspecified this rate limit is not applied. Supported units : B ( 2^0 ), K ( 2^10 ), M ( 2^20 ), G ( 2^30 ), defaults to B.</li>
* <li><b>totalSize</b>: Size of emails allowed for all users during duration (each recipient of the email email count one time). Optional, if unspecified this rate limit is not applied. Supported units : B ( 2^0 ), K ( 2^10 ), M ( 2^20 ), G ( 2^30 ), defaults to B. Note that
* totalSize is limited in increments of 2exp(31) - ~2 billions: sending a 10MB file to more than 205 recipients will be rejected if this parameter is enabled.</li>
* <li><b>rateLimiterTimeout</b>: [Optional, default to None]. Specifies the timeout exception for rate limiter checking. Supported time unit: seconds (s)</li>
* </ul>
*
* <p>For instance, to apply all the examples given above:</p>
Expand All @@ -92,6 +96,7 @@ object GlobalRateLimiter {
* &lt;recipients&gt;20&lt;/recipients&gt;
* &lt;size&gt;100M&lt;/size&gt;
* &lt;totalSize&gt;200M&lt;/totalSize&gt;
* &lt;rateLimiterTimeout&gt;10s&lt;/rateLimiterTimeout&gt;
* &lt;exceededProcessor&gt;tooMuchMails&lt;/exceededProcessor&gt;
* &lt;/mailet&gt;
* </code></pre>
Expand All @@ -111,6 +116,7 @@ class GlobalRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends
private var totalSizeRateLimiter: GlobalRateLimiter = _
private var exceededProcessor: String = _
private var keyPrefix: Option[KeyPrefix] = _
private var rateLimiterTimeout: Option[Duration] = _

override def init(): Unit = {
import org.apache.james.transport.mailets.ConfigurationOps.DurationOps
Expand All @@ -123,6 +129,9 @@ class GlobalRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends
keyPrefix = Option(getInitParameter("keyPrefix")).map(KeyPrefix)
exceededProcessor = getInitParameter("exceededProcessor", Mail.ERROR)

rateLimiterTimeout = getMailetConfig.getDuration("rateLimiterTimeout")
Preconditions.checkArgument(rateLimiterTimeout.isEmpty || rateLimiterTimeout.get.isPositive, "rateLimiterTimeout can not be negative".asInstanceOf[Object])

def globalRateLimiter(entityType: EntityType): GlobalRateLimiter = createRateLimiter(rateLimiterFactory, entityType, keyPrefix, duration, precision)

countRateLimiter = globalRateLimiter(Count)
Expand All @@ -133,13 +142,17 @@ class GlobalRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends

override def service(mail: Mail): Unit = {
val pivot: RateLimitingResult = AcceptableRate
val result = SFlux.merge(Seq(
countRateLimiter.rateLimit(mail),
recipientsRateLimiter.rateLimit(mail),
sizeRateLimiter.rateLimit(mail),
totalSizeRateLimiter.rateLimit(mail)))

val rateLimitChecker: SMono[RateLimitingResult] = SFlux.merge(Seq(
countRateLimiter.rateLimit(mail),
recipientsRateLimiter.rateLimit(mail),
sizeRateLimiter.rateLimit(mail),
totalSizeRateLimiter.rateLimit(mail)))
.fold(pivot)((a, b) => a.merge(b))
.block()

val result = rateLimiterTimeout
.map(timeout => rateLimitChecker.block(timeout.toScala))
.getOrElse(rateLimitChecker.block())

if (result.equals(RateExceeded)) {
mail.setState(exceededProcessor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ package org.apache.james.transport.mailets

import java.time.Duration
import java.util

import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableList

import jakarta.inject.Inject
import org.apache.james.core.MailAddress
import org.apache.james.lifecycle.api.LifecycleUtil
Expand All @@ -36,6 +37,7 @@ import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}

import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.util.Using

case class PerRecipientRateLimiter(rateLimiter: RateLimiter, keyPrefix: Option[KeyPrefix], entityType: EntityType) {
Expand Down Expand Up @@ -71,6 +73,7 @@ case class RecipientKey(keyPrefix: Option[KeyPrefix], entityType: EntityType, ma
* <li><b>duration</b>: Duration during which the rate limiting shall be applied. Compulsory, must be a valid duration of at least one second. Supported units includes s (second), m (minute), h (hour), d (day).</li>
* <li><b>count</b>: Count of emails allowed for a given sender during duration. Optional, if unspecified this rate limit is not applied.</li>
* <li><b>size</b>: Size of emails allowed for a given sender during duration (each email count one time, regardless of recipient count). Optional, if unspecified this rate limit is not applied. Supported units : B ( 2^0 ), K ( 2^10 ), M ( 2^20 ), G ( 2^30 ), defaults to B.</li>
* <li><b>rateLimiterTimeout</b>: [Optional, default to None]. Specifies the timeout exception for rate limiter checking. Supported time unit: seconds (s)</li>
* </ul>
*
* <p>For instance, to apply all the examples given above:</p>
Expand All @@ -81,6 +84,7 @@ case class RecipientKey(keyPrefix: Option[KeyPrefix], entityType: EntityType, ma
* &lt;duration&gt;1h&lt;/duration&gt;
* &lt;count&gt;10&lt;/count&gt;
* &lt;size&gt;100M&lt;/size&gt;
* &lt;rateLimiterTimeout&gt;10s&lt;/rateLimiterTimeout&gt;
* &lt;exceededProcessor&gt;tooMuchMails&lt;/exceededProcessor&gt;
* &lt;/mailet&gt;
* </code></pre>
Expand All @@ -96,12 +100,15 @@ case class RecipientKey(keyPrefix: Option[KeyPrefix], entityType: EntityType, ma
class PerRecipientRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) extends GenericMailet {
private var exceededProcessor: String = _
private var rateLimiters: Seq[PerRecipientRateLimiter] = _
private var rateLimiterTimeout: Option[Duration] = _

override def init(): Unit = {
val duration: Duration = parseDuration()
val precision: Option[Duration] = getMailetConfig.getDuration("precision")
val keyPrefix: Option[KeyPrefix] = getMailetConfig.getOptionalString("keyPrefix").map(KeyPrefix)
exceededProcessor = getMailetConfig.getOptionalString("exceededProcessor").getOrElse(Mail.ERROR)
rateLimiterTimeout = getMailetConfig.getDuration("rateLimiterTimeout")
Preconditions.checkArgument(rateLimiterTimeout.isEmpty || rateLimiterTimeout.get.isPositive, "rateLimiterTimeout can not be negative".asInstanceOf[Object])

def perRecipientRateLimiter(entityType: EntityType): Option[PerRecipientRateLimiter] = createRateLimiter(entityType, duration, precision, rateLimiterFactory, keyPrefix)

Expand Down Expand Up @@ -140,13 +147,19 @@ class PerRecipientRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) ex
.map(PerRecipientRateLimiter(_, keyPrefix, entityType))


private def applyRateLimiter(mail: Mail): Seq[(MailAddress, RateLimitingResult)] =
SFlux.fromIterable(mail.getRecipients.asScala)
private def applyRateLimiter(mail: Mail): Seq[(MailAddress, RateLimitingResult)] = {

val applyRateLimiterPublisher = SFlux.fromIterable(mail.getRecipients.asScala)
.flatMap(recipient => SFlux.merge(rateLimiters.map(rateLimiter => rateLimiter.rateLimit(recipient, mail)))
.fold[RateLimitingResult](AcceptableRate)((a, b) => a.merge(b))
.map(rateLimitingResult => (recipient, rateLimitingResult)), DEFAULT_CONCURRENCY)
.collectSeq()
.block()

rateLimiterTimeout
.map(timeout => applyRateLimiterPublisher.block(timeout.toScala))
.getOrElse(applyRateLimiterPublisher.block())

}


override def requiredProcessingState(): util.Collection[ProcessingState] = ImmutableList.of(new ProcessingState(exceededProcessor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ package org.apache.james.transport.mailets

import java.time.Duration
import java.util

import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Preconditions
import com.google.common.collect.ImmutableList

import jakarta.inject.Inject
import org.apache.james.core.MailAddress
import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult}
Expand All @@ -33,6 +34,8 @@ import org.apache.mailet.{Mail, ProcessingState}
import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}

import scala.jdk.DurationConverters._

case class PerSenderRateLimiter(rateLimiter: Option[RateLimiter], keyPrefix: Option[KeyPrefix], entityType: EntityType) {
def rateLimit(sender: MailAddress, mail: Mail): Publisher[RateLimitingResult] = {
val rateLimitingKey = SenderKey(keyPrefix, entityType, sender)
Expand Down Expand Up @@ -75,6 +78,7 @@ case class SenderKey(keyPrefix: Option[KeyPrefix], entityType: EntityType, mailA
* <li><b>size</b>: Size of emails allowed for a given sender during duration (each email count one time, regardless of recipient count). Optional, if unspecified this rate limit is not applied. Supported units : B ( 2^0 ), K ( 2^10 ), M ( 2^20 ), G ( 2^30 ), defaults to B.</li>
* <li><b>totalSize</b>: Size of emails allowed for a given sender during duration (each recipient of the email email count one time). Optional, if unspecified this rate limit is not applied. Supported units : B ( 2^0 ), K ( 2^10 ), M ( 2^20 ), G ( 2^30 ), defaults to B. Note that
* totalSize is limited in increments of 2exp(31) - ~2 billions: sending a 10MB file to more than 205 recipients will be rejected if this parameter is enabled.</li>
* <li><b>rateLimiterTimeout</b>: [Optional, default to None]. Specifies the timeout exception for rate limiter checking. Supported time unit: seconds (s)</li>
* </ul>
*
* <p>For instance, to apply all the examples given above:</p>
Expand All @@ -88,6 +92,7 @@ case class SenderKey(keyPrefix: Option[KeyPrefix], entityType: EntityType, mailA
* &lt;recipients&gt;20&lt;/recipients&gt;
* &lt;size&gt;100M&lt;/size&gt;
* &lt;totalSize&gt;200M&lt;/totalSize&gt;
* &lt;rateLimiterTimeout&gt;10s&lt;/rateLimiterTimeout&gt;
* &lt;exceededProcessor&gt;tooMuchMails&lt;/exceededProcessor&gt;
* &lt;/mailet&gt;
* </code></pre>
Expand All @@ -107,13 +112,17 @@ class PerSenderRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) exten
private var totalSizeRateLimiter: PerSenderRateLimiter = _
private var exceededProcessor: String = _
private var keyPrefix: Option[KeyPrefix] = _
private var rateLimiterTimeout: Option[Duration] = _

override def init(): Unit = {
val duration: Duration = parseDuration()
val precision: Option[Duration] = getMailetConfig.getDuration("precision")

keyPrefix = Option(getInitParameter("keyPrefix")).map(KeyPrefix)
exceededProcessor = getInitParameter("exceededProcessor", Mail.ERROR)
rateLimiterTimeout = getMailetConfig.getDuration("rateLimiterTimeout")
Preconditions.checkArgument(rateLimiterTimeout.isEmpty || rateLimiterTimeout.get.isPositive, "rateLimiterTimeout can not be negative".asInstanceOf[Object])


def perSenderRateLimiter(entityType: EntityType): PerSenderRateLimiter = createRateLimiter(rateLimiterFactory, entityType, keyPrefix, duration, precision)

Expand All @@ -135,13 +144,16 @@ class PerSenderRateLimit @Inject()(rateLimiterFactory: RateLimiterFactory) exten

private def applyRateLimiter(mail: Mail, sender: MailAddress): Unit = {
val pivot: RateLimitingResult = AcceptableRate
val result = SFlux.merge(Seq(
val applyRateLimiterPublisher = SFlux.merge(Seq(
countRateLimiter.rateLimit(sender, mail),
recipientsRateLimiter.rateLimit(sender, mail),
sizeRateLimiter.rateLimit(sender, mail),
totalSizeRateLimiter.rateLimit(sender, mail)))
.fold(pivot)((a, b) => a.merge(b))
.block()

val result = rateLimiterTimeout
.map(timeout => applyRateLimiterPublisher.block(timeout.toScala))
.getOrElse(applyRateLimiterPublisher.block())

if (result.equals(RateExceeded)) {
mail.setState(exceededProcessor)
Expand Down

0 comments on commit b7488ac

Please sign in to comment.