Skip to content

Commit

Permalink
Add separate thread for processing logevents (#35)
Browse files Browse the repository at this point in the history
* Add separate thread for processing logevents
  • Loading branch information
tore-statsig authored Nov 17, 2021
1 parent 1a47c30 commit 55929d4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 20 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
android.useAndroidX=true
libraryVersion=4.1.0
libraryVersion=4.2.0
kotlinVersion=1.5.0
21 changes: 16 additions & 5 deletions src/main/java/com/statsig/androidsdk/Statsig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.*

Expand Down Expand Up @@ -158,6 +159,7 @@ object Statsig {
lifecycleListener = StatsigActivityLifecycleListener()
application.registerActivityLifecycleCallbacks(lifecycleListener)
logger = StatsigLogger(
statsigScope,
sdkKey,
options.api,
statsigMetadata,
Expand Down Expand Up @@ -331,16 +333,23 @@ object Statsig {
pollForUpdates()
}

@JvmSynthetic
suspend fun shutdownSuspend() {
enforceInitialized("shutdown")
pollingJob?.cancel()
logger.shutdown()
}

/**
* Informs the Statsig SDK that the client is shutting down to complete cleanup saving state
* @throws IllegalStateException if the SDK has not been initialized
*/
@JvmStatic
fun shutdown() {
enforceInitialized("shutdown")
pollingJob?.cancel()
statsigScope.launch {
logger.flush()
runBlocking {
withContext(Dispatchers.Main.immediate) {
shutdownSuspend()
}
}
}

Expand Down Expand Up @@ -435,7 +444,9 @@ object Statsig {

override fun onActivityStopped(activity: Activity) {
currentActivity = null
shutdown()
statsigScope.launch {
logger.flush()
}
}

override fun onActivitySaveInstanceState(activity: Activity, outState: Bundle) {
Expand Down
37 changes: 23 additions & 14 deletions src/main/java/com/statsig/androidsdk/StatsigLogger.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.statsig.androidsdk

import com.google.gson.Gson
import java.util.concurrent.Executors
import android.content.SharedPreferences
import com.google.gson.annotations.SerializedName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlinx.coroutines.*

internal const val MAX_EVENTS: Int = 500
internal const val MAX_EVENTS: Int = 10
internal const val FLUSH_TIMER_MS: Long = 60000

internal const val CONFIG_EXPOSURE = "statsig::config_exposure"
Expand All @@ -19,33 +18,37 @@ internal data class LogEventData(
)

internal class StatsigLogger(
coroutineScope: CoroutineScope,
private val sdkKey: String,
private val api: String,
private val statsigMetadata: StatsigMetadata,
private val statsigNetwork: StatsigNetwork
) {
private val gson = Gson()

// Since these collections are not thread-safe, they will be modified in a single thread only
private val executor = Executors.newSingleThreadExecutor();
private val singleThreadDispatcher = executor.asCoroutineDispatcher()
private val timer = coroutineScope.launch {
while (coroutineScope.isActive) {
delay(FLUSH_TIMER_MS)
flush()
}
}
// Modify in a single thread only
internal var events = arrayListOf<LogEvent>()

suspend fun log(event: LogEvent) {
withContext(Dispatchers.Main.immediate) { // Run on main thread if not already in it
withContext(singleThreadDispatcher) {
events.add(event)

if (events.size >= MAX_EVENTS) {
flush()
}

if (events.size == 1) {
delay(FLUSH_TIMER_MS)
flush()
}
}
}

suspend fun flush() {
withContext(Dispatchers.Main.immediate) {
withContext(singleThreadDispatcher) {
if (events.size == 0) {
return@withContext
}
Expand All @@ -57,7 +60,7 @@ internal class StatsigLogger(

suspend fun logGateExposure(gateName: String, gateValue: Boolean, ruleID: String,
secondaryExposures: Array<Map<String, String>>, user: StatsigUser?) {
withContext(Dispatchers.Main.immediate) {
withContext(singleThreadDispatcher) {
var event = LogEvent(GATE_EXPOSURE)
event.user = user
event.metadata =
Expand All @@ -73,12 +76,18 @@ internal class StatsigLogger(

suspend fun logConfigExposure(configName: String, ruleID: String, secondaryExposures: Array<Map<String, String>>,
user: StatsigUser?) {
withContext(Dispatchers.Main.immediate) {
withContext(singleThreadDispatcher) {
var event = LogEvent(CONFIG_EXPOSURE)
event.user = user
event.metadata = mapOf("config" to configName, "ruleID" to ruleID)
event.secondaryExposures = secondaryExposures
log(event)
}
}

suspend fun shutdown() {
timer.cancel()
flush()
executor.shutdown()
}
}

0 comments on commit 55929d4

Please sign in to comment.