From 903f8962c93388ebe0b71da6dc81e1bad306f207 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Sat, 8 Jun 2024 16:40:56 -0500 Subject: [PATCH] Added preliminary Materialized support --- .../scala/spec/SimpleHaloAndSQLiteSpec.scala | 9 ++++- core/src/main/scala/lightdb/index/Index.scala | 1 + .../scala/lightdb/index/Materialized.scala | 8 +++++ .../scala/lightdb/query/PagedResults.scala | 4 +++ core/src/main/scala/lightdb/query/Query.scala | 4 ++- .../scala/lightdb/lucene/LuceneIndex.scala | 2 ++ .../scala/lightdb/lucene/LuceneSupport.scala | 5 +-- sql/src/main/scala/lightdb/sql/SQLIndex.scala | 1 + .../main/scala/lightdb/sql/SQLIndexer.scala | 8 +++-- .../main/scala/lightdb/sql/SQLSupport.scala | 35 +++++++++++++------ 10 files changed, 59 insertions(+), 18 deletions(-) create mode 100644 core/src/main/scala/lightdb/index/Materialized.scala diff --git a/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala b/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala index c1944e9b..1cf5f63c 100644 --- a/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala +++ b/all/src/test/scala/spec/SimpleHaloAndSQLiteSpec.scala @@ -177,6 +177,13 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche person.name should be("Johnny Doe") } } + "query for materialized indexes" in { + Person.withSearchContext { implicit context => + Person.query.materialized.compile.toList.map { list => + list.map(m => m(Person.age)).toSet should be(Set(19, 21)) + } + } + } "delete John" in { Person.delete(id1).map { deleted => deleted should be(id1) @@ -256,7 +263,7 @@ class SimpleHaloAndSQLiteSpec extends AsyncWordSpec with AsyncIOSpec with Matche override implicit val rw: RW[Person] = RW.gen val name: I[String] = index.one("name", _.name) - val age: I[Int] = index.one("age", _.age) + val age: I[Int] = index.one("age", _.age, materialize = true) val ageLinks: IndexedLinks[Int, Person] = IndexedLinks[Int, Person]("age", _.age, _.toString, this) } diff --git a/core/src/main/scala/lightdb/index/Index.scala b/core/src/main/scala/lightdb/index/Index.scala index 0b194a2c..d3a73672 100644 --- a/core/src/main/scala/lightdb/index/Index.scala +++ b/core/src/main/scala/lightdb/index/Index.scala @@ -13,6 +13,7 @@ trait Index[F, D <: Document[D]] { def fieldName: String def indexSupport: IndexSupport[D] + def materialize: Boolean def get: D => List[F] def getJson: D => List[Json] = (doc: D) => get(doc).map(_.json) diff --git a/core/src/main/scala/lightdb/index/Materialized.scala b/core/src/main/scala/lightdb/index/Materialized.scala new file mode 100644 index 00000000..5f2043ad --- /dev/null +++ b/core/src/main/scala/lightdb/index/Materialized.scala @@ -0,0 +1,8 @@ +package lightdb.index + +import lightdb.{Document, Id} + +class Materialized[D <: Document[D]](map: Map[Index[_, D], Any]) { + def get[F](index: Index[F, D]): Option[F] = map.get(index).map(_.asInstanceOf[F]) + def apply[F](index: Index[F, D]): F = get(index).getOrElse(throw new NullPointerException(s"${index.fieldName} not found in [${map.keySet.map(_.fieldName).mkString(", ")}]")) +} \ No newline at end of file diff --git a/core/src/main/scala/lightdb/query/PagedResults.scala b/core/src/main/scala/lightdb/query/PagedResults.scala index 28838b39..a2e6f459 100644 --- a/core/src/main/scala/lightdb/query/PagedResults.scala +++ b/core/src/main/scala/lightdb/query/PagedResults.scala @@ -2,6 +2,7 @@ package lightdb.query import cats.effect.IO import cats.implicits.toTraverseOps +import lightdb.index.Materialized import lightdb.{Document, Id} case class PagedResults[D <: Document[D], V](query: Query[D, V], @@ -9,6 +10,7 @@ case class PagedResults[D <: Document[D], V](query: Query[D, V], offset: Int, total: Int, idsAndScores: List[(Id[D], Double)], + materialized: List[Materialized[D]], getter: Option[Id[D] => IO[D]] = None) { lazy val page: Int = offset / query.pageSize lazy val pages: Int = math.ceil(query.limit.getOrElse(total).toDouble / query.pageSize.toDouble).toInt @@ -22,6 +24,8 @@ case class PagedResults[D <: Document[D], V](query: Query[D, V], def idStream: fs2.Stream[IO, Id[D]] = fs2.Stream(ids: _*) + def materializedStream: fs2.Stream[IO, Materialized[D]] = fs2.Stream(materialized: _*) + def idAndScoreStream: fs2.Stream[IO, (Id[D], Double)] = fs2.Stream(idsAndScores: _*) def docStream: fs2.Stream[IO, D] = idStream diff --git a/core/src/main/scala/lightdb/query/Query.scala b/core/src/main/scala/lightdb/query/Query.scala index aab3412d..0350424a 100644 --- a/core/src/main/scala/lightdb/query/Query.scala +++ b/core/src/main/scala/lightdb/query/Query.scala @@ -2,7 +2,7 @@ package lightdb.query import cats.Eq import cats.effect.IO -import lightdb.index.{Index, IndexSupport} +import lightdb.index.{Index, IndexSupport, Materialized} import lightdb.model.AbstractCollection import lightdb.spatial.GeoPoint import lightdb.util.DistanceCalculator @@ -98,6 +98,8 @@ case class Query[D <: Document[D], V](indexSupport: IndexSupport[D], def idStream(implicit context: SearchContext[D]): fs2.Stream[IO, Id[D]] = pageStream.flatMap(_.idStream) + def materialized(implicit context: SearchContext[D]): fs2.Stream[IO, Materialized[D]] = pageStream.flatMap(_.materializedStream) + def stream(implicit context: SearchContext[D]): fs2.Stream[IO, V] = pageStream.flatMap(_.stream) def grouped[F](index: Index[F, D], diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala b/lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala index 2165596a..bf13e416 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneIndex.scala @@ -25,6 +25,8 @@ case class LuceneIndex[F, D <: Document[D]](fieldName: String, private implicit def filter2Lucene(filter: Filter[D]): LuceneFilter[D] = filter.asInstanceOf[LuceneFilter[D]] + override def materialize: Boolean = false // TODO: Support materialization + lazy val fieldSortName: String = { val separate = rw.definition.className.collect { case "lightdb.spatial.GeoPoint" => true diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala b/lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala index 74d26786..16087d1f 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneSupport.scala @@ -3,7 +3,7 @@ package lightdb.lucene import cats.effect.IO import fabric.define.DefType import lightdb._ -import lightdb.index.{IndexSupport, Index} +import lightdb.index.{Index, IndexSupport, Materialized} import lightdb.model.AbstractCollection import lightdb.query.{Filter, PageContext, PagedResults, Query, SearchContext, Sort, SortDirection} import lightdb.spatial.GeoPoint @@ -65,7 +65,8 @@ trait LuceneSupport[D <: Document[D]] extends IndexSupport[D] { context = indexContext, offset = offset, total = total, - idsAndScores = idsAndScores + idsAndScores = idsAndScores, + materialized = Nil ) } diff --git a/sql/src/main/scala/lightdb/sql/SQLIndex.scala b/sql/src/main/scala/lightdb/sql/SQLIndex.scala index 9d619df2..8f62babc 100644 --- a/sql/src/main/scala/lightdb/sql/SQLIndex.scala +++ b/sql/src/main/scala/lightdb/sql/SQLIndex.scala @@ -7,6 +7,7 @@ import lightdb.query.Filter case class SQLIndex[F, D <: Document[D]](fieldName: String, indexSupport: IndexSupport[D], + materialize: Boolean, get: D => List[F])(implicit val rw: RW[F]) extends Index[F, D] { override def is(value: F): Filter[D] = SQLFilter[D](s"$fieldName = ?", List(value.json)) diff --git a/sql/src/main/scala/lightdb/sql/SQLIndexer.scala b/sql/src/main/scala/lightdb/sql/SQLIndexer.scala index cee5ec1d..7a2b7e22 100644 --- a/sql/src/main/scala/lightdb/sql/SQLIndexer.scala +++ b/sql/src/main/scala/lightdb/sql/SQLIndexer.scala @@ -15,14 +15,16 @@ case class SQLIndexer[D <: Document[D]](indexSupport: SQLSupport[D]) extends Ind f(context) } - def apply[F](name: String, get: D => List[F])(implicit rw: RW[F]): Index[F, D] = SQLIndex( + def apply[F](name: String, get: D => List[F], materialize: Boolean = false) + (implicit rw: RW[F]): Index[F, D] = SQLIndex( fieldName = name, indexSupport = indexSupport, + materialize = materialize, get = doc => get(doc) ) - def one[F](name: String, get: D => F)(implicit rw: RW[F]): Index[F, D] = - apply[F](name, doc => List(get(doc))) + def one[F](name: String, get: D => F, materialize: Boolean = false) + (implicit rw: RW[F]): Index[F, D] = apply[F](name, doc => List(get(doc)), materialize = materialize) override def truncate(): IO[Unit] = indexSupport.truncate() diff --git a/sql/src/main/scala/lightdb/sql/SQLSupport.scala b/sql/src/main/scala/lightdb/sql/SQLSupport.scala index f8012450..4e6e7e8c 100644 --- a/sql/src/main/scala/lightdb/sql/SQLSupport.scala +++ b/sql/src/main/scala/lightdb/sql/SQLSupport.scala @@ -4,7 +4,7 @@ import cats.effect.IO import fabric._ import fabric.io.JsonFormatter import lightdb.{Document, Id} -import lightdb.index.{Index, IndexSupport} +import lightdb.index.{Index, IndexSupport, Materialized} import lightdb.model.AbstractCollection import lightdb.query.{PagedResults, Query, SearchContext, Sort, SortDirection} import lightdb.util.FlushingBacklog @@ -13,6 +13,7 @@ import java.nio.file.{Files, Path} import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, Types} import scala.util.Try +// TODO: Move all of IndexSupport custom code into SQLIndexed trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { private var _connection: Option[Connection] = None @@ -66,7 +67,7 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { override lazy val index: SQLIndexer[D] = SQLIndexer(this) - val _id: Index[Id[D], D] = index.one("_id", _._id) + val _id: Index[Id[D], D] = index.one("_id", _._id, materialize = true) private[lightdb] lazy val backlog = new FlushingBacklog[Id[D], D](1_000, 10_000) { override protected def write(list: List[D]): IO[Unit] = IO.blocking { @@ -139,9 +140,10 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { case Nil => "" case list => list.mkString("ORDER BY ", ", ", "") } + val fields = index.fields.filter(_.materialize).map(_.fieldName).mkString(", ") val sql = s"""SELECT - | * + | $fields |FROM | ${collection.collectionName} |$filters @@ -154,14 +156,15 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { val ps = prepare(sql, params) val rs = ps.executeQuery() try { - val data = this.data(rs) + val materialized = this.materialized(rs) PagedResults( query = query, context = SQLPageContext(context), offset = offset, total = total, - idsAndScores = data.ids.map(id => id -> 0.0), - getter = data.lookup + idsAndScores = materialized.map(_.apply(_id)).map(id => id -> 0.0), + materialized = materialized, + getter = None ) } finally { rs.close() @@ -169,14 +172,18 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { } } - protected def data(rs: ResultSet): SQLData[D] = { - val iterator = new Iterator[Id[D]] { + protected def materialized(rs: ResultSet): List[Materialized[D]] = { + val iterator = new Iterator[Materialized[D]] { override def hasNext: Boolean = rs.next() - override def next(): Id[D] = Id[D](rs.getString("_id")) + override def next(): Materialized[D] = { + val map = index.fields.filter(_.materialize).map { index => + index -> getValue(rs, index) + }.toMap + new Materialized[D](map) + } } - val ids = iterator.toList - SQLData(ids, None) + iterator.toList } override protected def indexDoc(doc: D, fields: List[Index[_, D]]): IO[Unit] = @@ -206,6 +213,12 @@ trait SQLSupport[D <: Document[D]] extends IndexSupport[D] { case _ => ps.setString(index, JsonFormatter.Compact(value)) } + private def getValue[F](rs: ResultSet, index: Index[F, D]): Any = rs.getObject(index.fieldName) match { + case s: String => index.rw.write(str(s)) + case i: java.lang.Integer => index.rw.write(num(i.intValue())) + case v => throw new UnsupportedOperationException(s"${index.fieldName} returned $v (${v.getClass.getName})") + } + private def commit(): IO[Unit] = IO.blocking { if (!enableAutoCommit) connection.commit()