Skip to content

Commit

Permalink
Add ST_IntersectsExtent push-down rules (#8)
Browse files Browse the repository at this point in the history
* Add ST_IntersectsExtent pushdown rules and the initial tests coverage
  • Loading branch information
pomadchin authored Apr 9, 2022
1 parent 7c7be46 commit 3195016
Show file tree
Hide file tree
Showing 12 changed files with 465 additions and 4 deletions.
8 changes: 6 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ metals.sbt

.bsp

# Test data files #
# Test data files

java/data

# Compiled libs #
# Compiled libs

java/*.dylib
java/*.so
java/*dll

# Spark files
metastore_db
derby.log
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ lazy val commonSettings = Seq(
homepage := Some(url("https://github.com/azavea/hiveless")),
versionScheme := Some("semver-spec"),
Test / publishArtifact := false,
Test / fork := true,
developers := List(
Developer(
"pomadchin",
Expand Down Expand Up @@ -104,10 +105,15 @@ lazy val spatial = project
.dependsOn(core % "compile->compile;provided->provided", jts)
.settings(commonSettings)
.settings(name := "hiveless-spatial")
.settings(libraryDependencies += "org.locationtech.geomesa" %% "geomesa-spark-jts" % geomesaVersion)
.settings(
libraryDependencies ++= Seq(
"org.locationtech.geomesa" %% "geomesa-spark-jts" % geomesaVersion,
"org.locationtech.geotrellis" %% "geotrellis-spark-testkit" % geotrellisVersion % Test excludeAll (excludedDependencies: _*)
)
)

lazy val `spatial-index` = project
.dependsOn(spatial % "compile->compile;provided->provided")
.dependsOn(spatial % "compile->compile;provided->provided;test->test")
.settings(commonSettings)
.settings(name := "hiveless-spatial-index")
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.hiveless.spatial.rules

import com.azavea.hiveless.spatial._
import com.azavea.hiveless.spatial.index.ST_IntersectsExtent
import com.azavea.hiveless.serializers.syntax._
import org.locationtech.jts.geom.Geometry
import geotrellis.vector._
import cats.syntax.option._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.hive.HiveGenericUDF

object SpatialFilterPushdownRules extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan =
plan.transformDown {
// HiveGenericUDF is a private[hive] case class
case Filter(condition: HiveGenericUDF, plan) if condition.of[ST_IntersectsExtent] =>
// extract bbox, snd
val Seq(bboxExpr, geometryExpr) = condition.children
// extract extent from the right
val extent = geometryExpr.eval(null).convert[Geometry].extent

// transform expression
val expr = List(
IsNotNull(bboxExpr),
GreaterThanOrEqual(GetStructField(bboxExpr, 0, "xmin".some), Literal(extent.xmin)),
GreaterThanOrEqual(GetStructField(bboxExpr, 1, "ymin".some), Literal(extent.ymin)),
LessThanOrEqual(GetStructField(bboxExpr, 2, "xmax".some), Literal(extent.xmax)),
LessThanOrEqual(GetStructField(bboxExpr, 3, "ymax".some), Literal(extent.ymax))
).and

Filter(expr, plan)
}

def registerOptimizations(sqlContext: SQLContext): Unit =
Seq(SpatialFilterPushdownRules).foreach { r =>
if (!sqlContext.experimental.extraOptimizations.contains(r))
sqlContext.experimental.extraOptimizations ++= Seq(r)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.hiveless.spatial

/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import org.apache.spark.sql.hive.HiveGenericUDF

import scala.reflect.{classTag, ClassTag}

package object rules extends Serializable {
implicit class HiveGenericUDFOps(val self: HiveGenericUDF) extends AnyVal {
def of[T: ClassTag]: Boolean = self.funcWrapper.functionClassName == classTag[T].toString
}

implicit class ListExpressionsOps(val self: List[Expression]) extends AnyVal {
def and: Expression = self.reduce(And)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless

import org.apache.spark.sql.hive.hiveless.spatial.rules.SpatialFilterPushdownRules
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalatest.{BeforeAndAfterAll, Suite}

import java.io.File
import scala.io.Source

trait SpatialIndexHiveTestEnvironment extends SpatialHiveTestEnvironment { self: Suite with BeforeAndAfterAll =>
import SpatialHiveTestEnvironment._

private def spatialFunctions: List[String] =
Source
.fromFile(new File("../spatial/sql/createUDFs.sql").toURI)
.using(_.mkString.split(";").toList.map(_.trim).filter(_.nonEmpty))

private def spatialIndexFunctions: List[String] =
Source
.fromFile(new File("../spatial-index/sql/createUDFs.sql").toURI)
.using(_.mkString.split(";").toList.map(_.trim).filter(_.nonEmpty))

// function to override Hive SQL functions registration
override def registerHiveUDFs(ssc: SparkSession): Unit =
(spatialFunctions ::: spatialIndexFunctions).foreach(ssc.sql)

// function to override optimizations
override def registerOptimizations(sqlContext: SQLContext): Unit =
SpatialFilterPushdownRules.registerOptimizations(sqlContext)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless

trait SpatialIndexTestTables extends SpatialTestTables { self: SpatialHiveTestEnvironment =>
override def createViews(): Unit =
ssc.sql(
"""
|CREATE TEMPORARY VIEW polygons_csv_view AS (
| SELECT *, ST_GeomFromWKT(wkt) AS geom, ST_ExtentFromGeom(ST_GeomFromWKT(wkt)) as bbox FROM polygons_csv
|);
|""".stripMargin
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2022 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.azavea.hiveless.spatial.index

import com.azavea.hiveless.{SpatialIndexHiveTestEnvironment, SpatialIndexTestTables}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.scalatest.funspec.AnyFunSpec

class STIndexSpec extends AnyFunSpec with SpatialIndexHiveTestEnvironment with SpatialIndexTestTables {

describe("ST Index functions spec") {
it("ST_IntersectsExtent should filter a CSV file") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_csv_view WHERE ST_IntersectsExtent(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)

df.count() shouldBe 5
}

it("ST_IntersectsExtent should filter a Parquet file") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_IntersectsExtent(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)

df.count() shouldBe 5
}

it("ST_IntersectsExtent plan should be optimized") {
val df = ssc.sql(
"""
|SELECT * FROM polygons_parquet WHERE ST_IntersectsExtent(bbox, ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-75.5859375,40.32517767999294],[-75.5859375,43.197167282501276],[-72.41015625,43.197167282501276],[-72.41015625,40.32517767999294],[-75.5859375,40.32517767999294]]]}'))
|""".stripMargin
)

val dfe = ssc.sql(
"""
|SELECT * FROM polygons_parquet
|WHERE bbox.xmin >= -75.5859375
|AND bbox.ymin >= 40.3251777
|AND bbox.xmax <= -72.4101562
|AND bbox.ymax <= 43.1971673
|""".stripMargin
)

df.count() shouldBe dfe.count()

// compare optimized plans filters
val dfc = df.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }
val dfec = dfe.queryExecution.optimizedPlan.collect { case Filter(condition, _) => condition }

dfc shouldBe dfec
}
}
}
6 changes: 6 additions & 0 deletions spatial/src/test/resources/polygons.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
address,bbl,wkt
"",4050660250,"POLYGON((-73.83616 40.75531, -73.83622 40.75555, -73.83723 40.75507, -73.83775 40.75486, -73.83796 40.75479, -73.83833 40.75464, -73.8388 40.75445, -73.83932 40.75426, -73.83976 40.75412, -73.83984 40.7541, -73.83985 40.75405, -73.83985 40.75403, -73.83986 40.75402, -73.83987 40.75401, -73.83988 40.75401, -73.83989 40.754, -73.83993 40.75398, -73.83998 40.75397, -73.84004 40.75395, -73.8401 40.75393, -73.84014 40.75391, -73.84011 40.75385, -73.83984 40.75395, -73.84003 40.75355, -73.83999 40.75356, -73.83931 40.75407, -73.83914 40.75411, -73.83909 40.75405, -73.83884 40.75407, -73.83891 40.75412, -73.83776 40.75452, -73.83658 40.7551, -73.83623 40.75528, -73.83616 40.75531))"
"",1002710036,"POLYGON((-73.98955 40.71278, -73.98958 40.71299, -73.98962 40.71299, -73.98961 40.71292, -73.98959 40.71278, -73.98955 40.71278))"
"",4004900102,"POLYGON((-73.93651 40.77378, -73.93679 40.77387, -73.93681 40.77388, -73.93682 40.77389, -73.93685 40.7739, -73.93687 40.77392, -73.93689 40.77393, -73.93691 40.77395, -73.9371 40.77394, -73.93712 40.77385, -73.93714 40.77377, -73.93714 40.77368, -73.93713 40.7736, -73.93712 40.77351, -73.93709 40.77343, -73.93705 40.77335, -73.93701 40.77327, -73.93696 40.77319, -73.9369 40.77312, -73.93683 40.77305, -73.93675 40.77299, -73.93667 40.77293, -73.93658 40.77288, -73.93649 40.77283, -73.93639 40.77279, -73.93629 40.77275, -73.93603 40.77326, -73.93651 40.77341, -73.93659 40.77363, -73.93651 40.77378))"
"",4024820051,"POLYGON((-73.89244 40.73341, -73.8923 40.73359, -73.8924 40.73359, -73.89262 40.73332, -73.89262 40.73331, -73.8927 40.73322, -73.89279 40.73309, -73.893 40.73281, -73.89299 40.7327, -73.89298 40.73271, -73.8929 40.73282, -73.89283 40.73292, -73.89267 40.73312, -73.89263 40.73318, -73.89257 40.73325, -73.89252 40.73331, -73.89244 40.73341))"
"",4092410183,"POLYGON((-73.82746 40.70538, -73.82755 40.70541, -73.82771 40.70509, -73.82762 40.70506, -73.82749 40.70532, -73.82746 40.70538))"
Binary file not shown.
Loading

0 comments on commit 3195016

Please sign in to comment.