Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Mar 26, 2024
2 parents 1571119 + f5107fa commit a231e5c
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import com.klaviyo.analytics.networking.requests.PushTokenApiRequest
import com.klaviyo.core.Registry
import com.klaviyo.core.lifecycle.ActivityEvent
import java.util.concurrent.ConcurrentLinkedDeque
import kotlin.math.max
import kotlin.math.min
import kotlin.math.pow
import org.json.JSONArray
import org.json.JSONException
import org.json.JSONObject
Expand All @@ -38,9 +41,18 @@ internal object KlaviyoApiClient : ApiClient {
when (r.state) {
Status.Unsent.name -> Registry.log.verbose("${r.type} Request enqueued")
Status.Inflight.name -> Registry.log.verbose("${r.type} Request inflight")
Status.PendingRetry.name -> Registry.log.debug("${r.type} Request retrying")
Status.Complete.name -> Registry.log.verbose("${r.type} Request completed")
else -> Registry.log.debug("${r.type} Request failed")
Status.PendingRetry.name -> {
val attemptsRemaining = Registry.config.networkMaxAttempts - r.attempts
Registry.log.warning(
"${r.type} Request failed with code ${r.responseCode}, and will be retried up to $attemptsRemaining more times."
)
}
Status.Complete.name -> Registry.log.verbose(
"${r.type} Request succeeded with code ${r.responseCode}"
)
else -> Registry.log.error(
"${r.type} Request failed with code ${r.responseCode}, and will be dropped"
)
}

r.responseBody?.let { response ->
Expand Down Expand Up @@ -143,9 +155,9 @@ internal object KlaviyoApiClient : ApiClient {
// Keep track if there's any errors restoring from persistent store
var wasMutated = false

Registry.log.verbose("Restoring persisted queue")

Registry.dataStore.fetch(QUEUE_KEY)?.let {
Registry.log.verbose("Restoring persisted queue")

try {
val queue = JSONArray(it)
Array(queue.length()) { i -> queue.optString(i) }
Expand Down Expand Up @@ -227,16 +239,18 @@ internal object KlaviyoApiClient : ApiClient {
*/
private fun startBatch(force: Boolean = false) {
stopBatch() // we only ever want one batch job running
handler?.post(NetworkRunnable(force))
Registry.log.verbose("Started background handler")
handler?.post(NetworkRunnable(force)).also {
Registry.log.verbose("Posted job to network handler message queue")
}
}

/**
* Stop all jobs on our handler thread
*/
private fun stopBatch() {
handler?.removeCallbacksAndMessages(null)
Registry.log.verbose("Stopped background handler")
handler?.removeCallbacksAndMessages(null).also {
Registry.log.verbose("Cleared jobs from network handler message queue")
}
}

/**
Expand All @@ -251,11 +265,11 @@ internal object KlaviyoApiClient : ApiClient {
var force = force
private set

private val queueInitTime = Registry.clock.currentTimeMillis()
private var enqueuedTime = Registry.clock.currentTimeMillis()

private var networkType: Int = Registry.networkMonitor.getNetworkType().position

private var flushInterval: Long = Registry.config.networkFlushIntervals[networkType].toLong()
private var flushInterval: Long = Registry.config.networkFlushIntervals[networkType]

private val flushDepth: Int = Registry.config.networkFlushDepth

Expand All @@ -265,7 +279,7 @@ internal object KlaviyoApiClient : ApiClient {
* Posts another delayed batch job if requests remains
*/
override fun run() {
val queueTimePassed = Registry.clock.currentTimeMillis() - queueInitTime
val queueTimePassed = Registry.clock.currentTimeMillis() - enqueuedTime

if (getQueueSize() < flushDepth && queueTimePassed < flushInterval && !force) {
return requeue()
Expand All @@ -286,14 +300,14 @@ internal object KlaviyoApiClient : ApiClient {
// On success or absolute failure, remove from queue and persistent store
Registry.dataStore.clear(request.uuid)
// Reset the flush interval, in case we had done any exp backoff
flushInterval = Registry.config.networkFlushIntervals[networkType].toLong()
flushInterval = Registry.config.networkFlushIntervals[networkType]
broadcastApiRequest(request)
}
Status.PendingRetry -> {
// Encountered a retryable error
// Put this back on top of the queue and we'll try again with backoff
// Put this back on top of the queue, and we'll try again with backoff
apiQueue.offerFirst(request)
flushInterval *= request.attempts + 1
flushInterval = computeRetryInterval(request.attempts)
broadcastApiRequest(request)
break
}
Expand All @@ -318,10 +332,22 @@ internal object KlaviyoApiClient : ApiClient {
* Re-queue the job to run again after [flushInterval] milliseconds
*/
private fun requeue() {
Registry.log.verbose("Retrying network batch in $flushInterval")
Registry.log.verbose("Network batch will run in $flushInterval ms")
force = false
enqueuedTime = Registry.clock.currentTimeMillis()
handler?.postDelayed(this, flushInterval)
}

private fun computeRetryInterval(attempts: Int): Long {
val minRetryInterval = flushInterval
val jitterSeconds = Registry.config.networkJitterRange.random()
val exponentialBackoff = (2.0.pow(attempts).toLong() + jitterSeconds).times(1_000)
val maxRetryInterval = Registry.config.networkMaxRetryInterval
return min(
max(minRetryInterval, exponentialBackoff),
maxRetryInterval
)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ interface ApiRequest {
*/
val queuedTime: Long

/**
* Number of send attempts
*/
val attempts: Int

/**
* Time the request was made
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ internal open class KlaviyoApiRequest(
/**
* Tracks number of attempts to limit retries
*/
var attempts = 0
final override var attempts = 0
private set

/**
Expand Down Expand Up @@ -319,7 +319,7 @@ internal open class KlaviyoApiRequest(
status = when (responseCode) {
in successCodes -> Status.Complete
HTTP_RETRY -> {
if (attempts <= Registry.config.networkMaxRetries) {
if (attempts < Registry.config.networkMaxAttempts) {
Status.PendingRetry
} else {
Status.Failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import org.junit.Before
import org.junit.Test

internal class KlaviyoApiClientTest : BaseRequestTest() {
private val flushIntervalWifi = 10_000
private val flushIntervalCell = 20_000
private val flushIntervalOffline = 30_000
private val flushIntervalWifi = 10_000L
private val flushIntervalCell = 20_000L
private val flushIntervalOffline = 30_000L
private val queueDepth = 10
private var delayedRunner: KlaviyoApiClient.NetworkRunnable? = null
private var postedJob: KlaviyoApiClient.NetworkRunnable? = null
private val staticClock = StaticClock(TIME, ISO_TIME)

private companion object {
Expand All @@ -54,10 +54,10 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
every { DeviceProperties.buildEventMetaData() } returns emptyMap()
every { DeviceProperties.buildMetaData() } returns emptyMap()

delayedRunner = null
postedJob = null

every { Registry.clock } returns staticClock
every { configMock.networkFlushIntervals } returns intArrayOf(
every { configMock.networkFlushIntervals } returns longArrayOf(
flushIntervalWifi,
flushIntervalCell,
flushIntervalOffline
Expand All @@ -72,11 +72,11 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
every { HandlerUtil.getHandler(any()) } returns mockHandler.apply {
every { removeCallbacksAndMessages(any()) } returns Unit
every { post(any()) } answers { a ->
(a.invocation.args[0] as KlaviyoApiClient.NetworkRunnable).run()
true
postedJob = (a.invocation.args[0] as KlaviyoApiClient.NetworkRunnable)
postedJob!!.run().let { true }
}
every { postDelayed(any(), any()) } answers { a ->
delayedRunner = a.invocation.args[0] as KlaviyoApiClient.NetworkRunnable
postedJob = a.invocation.args[0] as KlaviyoApiClient.NetworkRunnable
true
}
}
Expand All @@ -92,15 +92,41 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
status: KlaviyoApiRequest.Status = KlaviyoApiRequest.Status.Complete
): KlaviyoApiRequest =
mockk<KlaviyoApiRequest>().also {
every { it.state } returns status.name
val getState = {
when (it.state) {
KlaviyoApiRequest.Status.Unsent.name -> KlaviyoApiRequest.Status.Unsent
KlaviyoApiRequest.Status.Inflight.name -> KlaviyoApiRequest.Status.Inflight
KlaviyoApiRequest.Status.PendingRetry.name -> KlaviyoApiRequest.Status.PendingRetry
KlaviyoApiRequest.Status.Complete.name -> KlaviyoApiRequest.Status.Complete
KlaviyoApiRequest.Status.Failed.name -> KlaviyoApiRequest.Status.Failed
else -> error("Invalid state")
}
}

// Initial attempts value
var attempts = if (getState() == KlaviyoApiRequest.Status.Unsent) 0 else 1

every { it.uuid } returns uuid
every { it.type } returns "Mock"
every { it.state } returns status.name
every { it.httpMethod } returns "GET"
every { it.url } returns URL("https://mock.com")
every { it.headers } returns mapOf("headerKey" to "headerValue")
every { it.query } returns mapOf("queryKey" to "queryValue")
every { it.send(any()) } answers {
attempts++
getState()
}
every { it.responseBody } returns null
every { it.send(any()) } returns status
every { it.responseCode } answers {
when (getState()) {
KlaviyoApiRequest.Status.PendingRetry -> 429
KlaviyoApiRequest.Status.Complete -> 202
KlaviyoApiRequest.Status.Failed -> 500
else -> null
}
}
every { it.attempts } answers { attempts }
every { it.toJson() } returns JSONObject(
"""
{
Expand Down Expand Up @@ -228,9 +254,9 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
var cbRequest: ApiRequest? = null
KlaviyoApiClient.onApiRequest { cbRequest = it }

delayedRunner!!.run()
postedJob!!.run()
assertEquals(request, cbRequest)
verify { logSpy.verbose(match { it.contains("complete") }) }
verify { logSpy.verbose(match { it.contains("succeed") }) }
}

@Test
Expand Down Expand Up @@ -277,7 +303,7 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
assertEquals(it + 1, KlaviyoApiClient.getQueueSize())
}

delayedRunner!!.run()
postedJob!!.run()

assertEquals(0, KlaviyoApiClient.getQueueSize())
}
Expand All @@ -290,7 +316,7 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {

staticClock.execute(flushIntervalWifi.toLong())

delayedRunner!!.run()
postedJob!!.run()

assertEquals(0, KlaviyoApiClient.getQueueSize())
}
Expand All @@ -302,7 +328,7 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
assertEquals(it + 1, KlaviyoApiClient.getQueueSize())
}

delayedRunner!!.run()
postedJob!!.run()

assertEquals(queueDepth - 1, KlaviyoApiClient.getQueueSize())
}
Expand Down Expand Up @@ -343,37 +369,65 @@ internal class KlaviyoApiClientTest : BaseRequestTest() {
}

@Test
fun `Rate limited requests are retried with a backoff`() {
val request1 = mockRequest("uuid-retry", KlaviyoApiRequest.Status.PendingRetry)
fun `Rate limited requests are retried with a backoff until max attempts`() {
val defaultInterval = Registry.config.networkFlushIntervals[NetworkMonitor.NetworkType.Wifi.position]

// First unsent request, which we will retry till max attempts
val request1 = mockRequest("uuid-retry", KlaviyoApiRequest.Status.Unsent)
every { request1.state } answers {
when (request1.attempts) {
0 -> KlaviyoApiRequest.Status.Unsent.name
50 -> KlaviyoApiRequest.Status.Failed.name
else -> KlaviyoApiRequest.Status.PendingRetry.name
}
}

// Second unset request in queue to ensure which shouldn't sent until first has failed
val request2 = mockRequest("uuid-unsent", KlaviyoApiRequest.Status.Unsent)
var attempts = 0
var backoffTime = flushIntervalWifi
every { request1.attempts } answers { attempts }

// Enqueue 2 requests
KlaviyoApiClient.enqueueRequest(request1, request2)

val job = KlaviyoApiClient.NetworkRunnable()
// Enqueueing should invoke handler.post and initialize our postedJob property
assertNotNull(postedJob)

// But the clock has not advanced, so no requests should have been sent yet
assertEquals(0, request1.attempts)

while (request1.state != KlaviyoApiRequest.Status.Failed.name) {
val startAttempts = request1.attempts

while (request1.attempts < configMock.networkMaxRetries) {
// Run before advancing the clock: it shouldn't attempt any sends
job.run()
verify(exactly = attempts) { request1.send(any()) }
// Advance the time with our expected backoff interval
staticClock.time += listOf(
defaultInterval, // First attempt starts after default interval
defaultInterval, // First RETRY starts after default interval bc 2s < 10s
defaultInterval, // Second RETRY starts after default interval bc 4s < 10s
defaultInterval, // Third RETRY starts after default interval bc 8s < 10s
16_000L, // Exp. backoff time should be used bc 16s > 10s
32_000L, // Exp. backoff time should be used bc 32s > 10s
64_000L, // Exp. backoff time should be used bc 64s > 10s
128_000L // Exp. backoff time should be used bc 128s > 10s
).getOrElse(startAttempts) { 180_000L } // Max backoff time should be used from here on, because 256s > 180s

attempts++
// Run after advancing the clock (this mimics how handler.postDelay would run jobs)
postedJob!!.run()

// Advance the time with increasing backoff interval
backoffTime *= attempts
staticClock.time += backoffTime
delayedRunner = null
// It should have attempted one send if the correct time elapsed
assertEquals(startAttempts + 1, request1.attempts)

job.run()
assertNotNull(delayedRunner)
assertEquals(2, KlaviyoApiClient.getQueueSize())
assertNotNull(dataStoreSpy.fetch(request1.uuid))
assertNotNull(dataStoreSpy.fetch(request2.uuid))
verify(exactly = attempts) { request1.send(any()) }
verify(inverse = true) { request2.send(any()) }
// Fail test if we exceed max attempts
assert(request1.attempts <= 50)
}

// First request should have been retried exactly 50 times
assertEquals(50, request1.attempts)

// Upon final failure, request 1 should have been dropped from the queue
assertEquals(1, KlaviyoApiClient.getQueueSize())
assertNull(dataStoreSpy.fetch(request1.uuid))

// Second request should have been attempted after the final failure of request 1
verify(exactly = 1) { request2.send(any()) }
}

@Test
Expand Down
Loading

0 comments on commit a231e5c

Please sign in to comment.