Skip to content

Commit

Permalink
Added preliminary Materialized support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jun 8, 2024
1 parent 2fdfc40 commit 903f896
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 18 deletions.
9 changes: 8 additions & 1 deletion all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche
person.name should be("Johnny Doe")
}
}
"query for materialized indexes" in {
Person.withSearchContext { implicit context =>
Person.query.materialized.compile.toList.map { list =>
list.map(m => m(Person.age)).toSet should be(Set(19, 21))
}
}
}
"delete John" in {
Person.delete(id1).map { deleted =>
deleted should be(id1)
Expand Down Expand Up @@ -256,7 +263,7 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche
override implicit val rw: RW[Person] = RW.gen

val name: I[String] = index.one("name", _.name)
val age: I[Int] = index.one("age", _.age)
val age: I[Int] = index.one("age", _.age, materialize = true)
val ageLinks: IndexedLinks[Int, Person] = IndexedLinks[Int, Person]("age", _.age, _.toString, this)
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/lightdb/index/Index.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ trait Index[F, D <: Document[D]] {

def fieldName: String
def indexSupport: IndexSupport[D]
def materialize: Boolean
def get: D => List[F]
def getJson: D => List[Json] = (doc: D) => get(doc).map(_.json)

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/lightdb/index/Materialized.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package lightdb.index

import lightdb.{Document, Id}

class Materialized[D <: Document[D]](map: Map[Index[_, D], Any]) {
def get[F](index: Index[F, D]): Option[F] = map.get(index).map(_.asInstanceOf[F])
def apply[F](index: Index[F, D]): F = get(index).getOrElse(throw new NullPointerException(s"${index.fieldName} not found in [${map.keySet.map(_.fieldName).mkString(", ")}]"))
}
4 changes: 4 additions & 0 deletions core/src/main/scala/lightdb/query/PagedResults.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package lightdb.query

import cats.effect.IO
import cats.implicits.toTraverseOps
import lightdb.index.Materialized
import lightdb.{Document, Id}

case class PagedResults[D <: Document[D], V](query: Query[D, V],
context: PageContext[D],
offset: Int,
total: Int,
idsAndScores: List[(Id[D], Double)],
materialized: List[Materialized[D]],
getter: Option[Id[D] => IO[D]] = None) {
lazy val page: Int = offset / query.pageSize
lazy val pages: Int = math.ceil(query.limit.getOrElse(total).toDouble / query.pageSize.toDouble).toInt
Expand All @@ -22,6 +24,8 @@ case class PagedResults[D <: Document[D], V](query: Query[D, V],

def idStream: fs2.Stream[IO, Id[D]] = fs2.Stream(ids: _*)

def materializedStream: fs2.Stream[IO, Materialized[D]] = fs2.Stream(materialized: _*)

def idAndScoreStream: fs2.Stream[IO, (Id[D], Double)] = fs2.Stream(idsAndScores: _*)

def docStream: fs2.Stream[IO, D] = idStream
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/lightdb/query/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lightdb.query

import cats.Eq
import cats.effect.IO
import lightdb.index.{Index, IndexSupport}
import lightdb.index.{Index, IndexSupport, Materialized}
import lightdb.model.AbstractCollection
import lightdb.spatial.GeoPoint
import lightdb.util.DistanceCalculator
Expand Down Expand Up @@ -98,6 +98,8 @@ case class Query[D <: Document[D], V](indexSupport: IndexSupport[D],

def idStream(implicit context: SearchContext[D]): fs2.Stream[IO, Id[D]] = pageStream.flatMap(_.idStream)

def materialized(implicit context: SearchContext[D]): fs2.Stream[IO, Materialized[D]] = pageStream.flatMap(_.materializedStream)

def stream(implicit context: SearchContext[D]): fs2.Stream[IO, V] = pageStream.flatMap(_.stream)

def grouped[F](index: Index[F, D],
Expand Down
2 changes: 2 additions & 0 deletions lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ case class LuceneIndex[F, D <: Document[D]](fieldName: String,

private implicit def filter2Lucene(filter: Filter[D]): LuceneFilter[D] = filter.asInstanceOf[LuceneFilter[D]]

override def materialize: Boolean = false // TODO: Support materialization

lazy val fieldSortName: String = {
val separate = rw.definition.className.collect {
case "lightdb.spatial.GeoPoint" => true
Expand Down
5 changes: 3 additions & 2 deletions lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lightdb.lucene
import cats.effect.IO
import fabric.define.DefType
import lightdb._
import lightdb.index.{IndexSupport, Index}
import lightdb.index.{Index, IndexSupport, Materialized}
import lightdb.model.AbstractCollection
import lightdb.query.{Filter, PageContext, PagedResults, Query, SearchContext, Sort, SortDirection}
import lightdb.spatial.GeoPoint
Expand Down Expand Up @@ -65,7 +65,8 @@ trait LuceneSupport[D <: Document[D]] extends IndexSupport[D] {
context = indexContext,
offset = offset,
total = total,
idsAndScores = idsAndScores
idsAndScores = idsAndScores,
materialized = Nil
)
}

Expand Down
1 change: 1 addition & 0 deletions sql/src/main/scala/lightdb/sql/SQLIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import lightdb.query.Filter

case class SQLIndex[F, D <: Document[D]](fieldName: String,
indexSupport: IndexSupport[D],
materialize: Boolean,
get: D => List[F])(implicit val rw: RW[F]) extends Index[F, D] {
override def is(value: F): Filter[D] = SQLFilter[D](s"$fieldName = ?", List(value.json))

Expand Down
8 changes: 5 additions & 3 deletions sql/src/main/scala/lightdb/sql/SQLIndexer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ case class SQLIndexer[D <: Document[D]](indexSupport: SQLSupport[D]) extends Ind
f(context)
}

def apply[F](name: String, get: D => List[F])(implicit rw: RW[F]): Index[F, D] = SQLIndex(
def apply[F](name: String, get: D => List[F], materialize: Boolean = false)
(implicit rw: RW[F]): Index[F, D] = SQLIndex(
fieldName = name,
indexSupport = indexSupport,
materialize = materialize,
get = doc => get(doc)
)

def one[F](name: String, get: D => F)(implicit rw: RW[F]): Index[F, D] =
apply[F](name, doc => List(get(doc)))
def one[F](name: String, get: D => F, materialize: Boolean = false)
(implicit rw: RW[F]): Index[F, D] = apply[F](name, doc => List(get(doc)), materialize = materialize)

override def truncate(): IO[Unit] = indexSupport.truncate()

Expand Down
35 changes: 24 additions & 11 deletions sql/src/main/scala/lightdb/sql/SQLSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.IO
import fabric._
import fabric.io.JsonFormatter
import lightdb.{Document, Id}
import lightdb.index.{Index, IndexSupport}
import lightdb.index.{Index, IndexSupport, Materialized}
import lightdb.model.AbstractCollection
import lightdb.query.{PagedResults, Query, SearchContext, Sort, SortDirection}
import lightdb.util.FlushingBacklog
Expand All @@ -13,6 +13,7 @@ import java.nio.file.{Files, Path}
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types}
import scala.util.Try

// TODO: Move all of IndexSupport custom code into SQLIndexed
trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {
private var _connection: Option[Connection] = None

Expand Down Expand Up @@ -66,7 +67,7 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {

override lazy val index: SQLIndexer[D] = SQLIndexer(this)

val _id: Index[Id[D], D] = index.one("_id", _._id)
val _id: Index[Id[D], D] = index.one("_id", _._id, materialize = true)

private[lightdb] lazy val backlog = new FlushingBacklog[Id[D], D](1_000, 10_000) {
override protected def write(list: List[D]): IO[Unit] = IO.blocking {
Expand Down Expand Up @@ -139,9 +140,10 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {
case Nil => ""
case list => list.mkString("ORDER BY ", ", ", "")
}
val fields = index.fields.filter(_.materialize).map(_.fieldName).mkString(", ")
val sql =
s"""SELECT
| *
| $fields
|FROM
| ${collection.collectionName}
|$filters
Expand All @@ -154,29 +156,34 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {
val ps = prepare(sql, params)
val rs = ps.executeQuery()
try {
val data = this.data(rs)
val materialized = this.materialized(rs)
PagedResults(
query = query,
context = SQLPageContext(context),
offset = offset,
total = total,
idsAndScores = data.ids.map(id => id -> 0.0),
getter = data.lookup
idsAndScores = materialized.map(_.apply(_id)).map(id => id -> 0.0),
materialized = materialized,
getter = None
)
} finally {
rs.close()
ps.close()
}
}

protected def data(rs: ResultSet): SQLData[D] = {
val iterator = new Iterator[Id[D]] {
protected def materialized(rs: ResultSet): List[Materialized[D]] = {
val iterator = new Iterator[Materialized[D]] {
override def hasNext: Boolean = rs.next()

override def next(): Id[D] = Id[D](rs.getString("_id"))
override def next(): Materialized[D] = {
val map = index.fields.filter(_.materialize).map { index =>
index -> getValue(rs, index)
}.toMap
new Materialized[D](map)
}
}
val ids = iterator.toList
SQLData(ids, None)
iterator.toList
}

override protected def indexDoc(doc: D, fields: List[Index[_, D]]): IO[Unit] =
Expand Down Expand Up @@ -206,6 +213,12 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] {
case _ => ps.setString(index, JsonFormatter.Compact(value))
}

private def getValue[F](rs: ResultSet, index: Index[F, D]): Any = rs.getObject(index.fieldName) match {
case s: String => index.rw.write(str(s))
case i: java.lang.Integer => index.rw.write(num(i.intValue()))
case v => throw new UnsupportedOperationException(s"${index.fieldName} returned $v (${v.getClass.getName})")
}

private def commit(): IO[Unit] = IO.blocking {
if (!enableAutoCommit)
connection.commit()
Expand Down

0 comments on commit 903f896

Please sign in to comment.