Skip to content

Commit

Permalink
Revert to using Flow<T> for index loading
Browse files Browse the repository at this point in the history
  • Loading branch information
garyttierney committed Apr 12, 2024
1 parent 71eb035 commit 447a17f
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 12 deletions.
3 changes: 1 addition & 2 deletions src/main/kotlin/framework/search/index/IndexBulkLoader.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package io.github.garyttierney.ghidralite.framework.search.index

import kotlinx.coroutines.flow.Flow
import java.util.stream.Stream

interface IndexBulkLoader<T : Any> {
suspend fun load(): Stream<T>
suspend fun load(): Flow<T>
}
13 changes: 4 additions & 9 deletions src/main/kotlin/framework/search/index/Indexes.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.github.garyttierney.ghidralite.framework.search.index

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.toCollection
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.withContext
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.reflect.KClass
Expand All @@ -28,13 +26,10 @@ class Indexes {
inline fun <reified T : Any> query(): Flow<T> = query(T::class)

suspend fun <T : Any> load(ty: KClass<T>, bulkLoader: IndexBulkLoader<T>) {
val data = withContext(Dispatchers.IO) {
bulkLoader.load()
.parallel()
.toList()
}
val items = mutableListOf<T>()
bulkLoader.load().flowOn(Dispatchers.IO).toCollection(items)

indexes[ty] = ConcurrentLinkedQueue(data)
indexes[ty] = ConcurrentLinkedQueue(items)
}

suspend inline fun <reified T : Any> load(bulkLoader: IndexBulkLoader<T>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package io.github.garyttierney.ghidralite.framework.search.index.program
import io.github.garyttierney.ghidralite.framework.db.GhidraRecord
import io.github.garyttierney.ghidralite.framework.db.GhidraTable
import io.github.garyttierney.ghidralite.framework.search.index.IndexBulkLoader
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.stream.consumeAsFlow
import java.util.stream.Stream

class ProgramDbTableLoader<T : GhidraRecord>(private val table: GhidraTable<T>, private val predicate: (T) -> Boolean = { true }) : IndexBulkLoader<T> {
override suspend fun load(): Stream<T> = table.all().filter(predicate)
override suspend fun load(): Flow<T> = table.allAfter().asFlow()
}

0 comments on commit 447a17f

Please sign in to comment.