Skip to content

Commit

Permalink
GEOMESA-3259 FSDS - Add support for GeoParquet
Browse files Browse the repository at this point in the history
commit 0ea8bff
Author: adeet1 <[email protected]>
Date:   Fri Mar 29 20:29:40 2024 +0000

    Optimize imports

commit 9ebd85a
Author: adeet1 <[email protected]>
Date:   Fri Mar 29 20:12:03 2024 +0000

    Initialize bounds as an empty array instead of null

    * This fixes a failing unit test "suppress or allow empty output files" in ExportCommandTest.scala

commit 4cff76a
Author: adeet1 <[email protected]>
Date:   Fri Mar 29 15:18:09 2024 +0000

    Split Parquet and Orc file compaction tests in order to differentiate the comparisons

commit 16d88fd
Author: adeet1 <[email protected]>
Date:   Wed Mar 27 20:48:07 2024 +0000

    Assert in each partition that GeoParquet metadata bounding boxes across files are correctly merged upon compaction

    * Write features with different geometries and coordinates, so we can test the merging of unique bounding boxes.

commit 4197e4d
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 21:27:17 2024 +0000

    Change thunk to lazy vals

commit 4eaf9fc
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 20:22:10 2024 +0000

    Implement methods instead of lazy vals

commit c82c0d2
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 20:13:56 2024 +0000

    Move test scope

commit 09588e8
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 20:01:00 2024 +0000

    Don't create a GeoParquet metadata string if the SFT has no geometries

commit 137dcb5
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 19:36:31 2024 +0000

    Re-implement GeoParquet metadata logic to work for SFTs with multiple geometries

commit 360c2c7
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 16:58:26 2024 +0000

    Change back to GroupReadSupport

    * This simply checks if the Parquet file is valid - it won't deserialize/manifest everything and thus saves us some processing

commit 3bce59e
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 14:39:34 2024 +0000

    Use the released GeoParquet metadata schema, not the dev one

commit 878abb5
Author: adeet1 <[email protected]>
Date:   Thu Mar 28 14:30:35 2024 +0000

    Optimize imports

commit d49fc3a
Author: adeet1 <[email protected]>
Date:   Wed Mar 27 14:47:54 2024 +0000

    Assert that the bounding box in the GeoParquet metadata is correct

commit 2ae9574
Author: adeet1 <[email protected]>
Date:   Tue Mar 26 23:14:46 2024 +0000

    Instantiate the observer directly in SimpleFeatureWriteSupport instead of passing it down from SimpleFeatureParquetWriter

commit 9770a3a
Author: adeet1 <[email protected]>
Date:   Fri Mar 22 14:09:05 2024 +0000

    Tweak targetSize

commit 604e614
Author: adeet1 <[email protected]>
Date:   Wed Mar 20 19:55:59 2024 +0000

    Assert that the file metadata adheres to the GeoParquet metadata json schema

commit 2257d6c
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 22:03:29 2024 +0000

    Deprecate the ParquetFunctionFactory class, but provide backwards compatibility

commit 03e699f
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 20:04:43 2024 +0000

    Create a new metadata map instance when adding bounding box

commit 8630eed
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 18:07:30 2024 +0000

    Change BoundsObserver argument back to FileSystemObserver

commit 921274b
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 17:53:38 2024 +0000

    If the sft has no geometry field, then omit the GeoParquet metadata entirely

commit c1dda99
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 17:51:26 2024 +0000

    Omit orientation, edges and epoch

commit dabdc43
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 17:39:47 2024 +0000

    Make variables private to avoid exposing mutable state outside the scope of the class

commit 5eecf48
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 17:32:01 2024 +0000

    Delete redundant checks in geometry read and write support

commit 0ed5c65
Author: adeet1 <[email protected]>
Date:   Thu Mar 21 14:55:29 2024 +0000

    Delete duplicate dependency

commit 3dc798d
Author: adeet1 <[email protected]>
Date:   Wed Mar 20 19:09:44 2024 +0000

    Support backwards compatibility for FilterConverter

commit 7dea125
Author: adeet1 <[email protected]>
Date:   Wed Mar 20 15:32:31 2024 +0000

    Delete .parquet.crc file after running tests

commit 652bf3a
Author: Adeet Patel <[email protected]>
Date:   Mon Feb 12 12:16:35 2024 -0500

    GEOMESA-3259 FSDS - Add support for GeoParquet

    * Create a BoundsObserver trait, and tweak various classes and methods to use that trait
    * Add an observer to the SimpleFeatureParquetWriter and write records to it, in order to create a bounding box of all the geometries. Add this bounding box to the GeoParquet metadata (which requires the metadata map to be changed to a mutable data structure).
    * Read/write all geometry attributes in binary (a primitive Parquet type) instead of as a pair of x/y doubles (a group Parquet type), using the same converter and attribute writer for all geometry types, while also maintaining backwards compatibility
    * Add support for parsing WKB bytes in the Parquet geometry transformer functions
    * Exclude bounding box from the GeoTools filter and use a spatial index instead

    Co-authored-by: Emilio Lahr-Vivaz <[email protected]>
  • Loading branch information
adeet1 committed Jun 4, 2024
1 parent b4193ae commit 4af7d90
Show file tree
Hide file tree
Showing 22 changed files with 831 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{MessageType, OriginalType, Type}
import org.geotools.api.feature.`type`.AttributeDescriptor
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.convert.EvaluationContext
import org.locationtech.geomesa.convert2.AbstractConverter.{BasicConfig, BasicField, BasicOptions}
Expand Down Expand Up @@ -63,6 +64,7 @@ class ParquetConverterFactory
// note: get the path as a URI so that we handle local files appropriately
val filePath = new Path(PathUtils.getUrl(p).toURI)
val footer = ParquetFileReader.readFooter(new Configuration(), filePath, ParquetMetadataConverter.NO_FILTER)
val parquetSchemaVersion = footer.getFileMetaData.getKeyValueMetaData.getOrDefault("geomesa.parquet.version", "0").toInt
val (schema, fields, id) = SimpleFeatureParquetSchema.read(footer.getFileMetaData) match {
case Some(parquet) =>
// this is a geomesa encoded parquet file
Expand All @@ -71,16 +73,7 @@ class ParquetConverterFactory
// note: parquet converter stores the generic record under index 0
val path = s"avroPath($$0, '/$name')"
// some types need a function applied to the underlying avro value
val expression = ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
val expression = computeTransformFunction(name, path, descriptor, parquetSchemaVersion)
BasicField(descriptor.getLocalName, Some(Expression(expression)))
}
val id = Expression(s"avroPath($$0, '/${SimpleFeatureParquetSchema.FeatureIdField}')")
Expand Down Expand Up @@ -115,6 +108,41 @@ class ParquetConverterFactory
}
}
}

private def computeTransformFunction(name: String, path: String, descriptor: AttributeDescriptor, schemaVersion: Int): String = {
def expressionV2(name: String, path: String, descriptor: AttributeDescriptor): String = {
ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"point(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"multipoint(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"linestring(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"multilinestring(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"polygon(avroPath($$0, '/$name'))"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"multipolygon(avroPath($$0, '/$name'))"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
}

def expressionV0V1(name: String, path: String, descriptor: AttributeDescriptor): String = {
ObjectType.selectType(descriptor) match {
case Seq(ObjectType.GEOMETRY, ObjectType.POINT) => s"parquetPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOINT) => s"parquetMultiPoint($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.LINESTRING) => s"parquetLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTILINESTRING) => s"parquetMultiLineString($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.POLYGON) => s"parquetPolygon($$0, '/$name')"
case Seq(ObjectType.GEOMETRY, ObjectType.MULTIPOLYGON) => s"parquetMultiPolygon($$0, '/$name')"
case Seq(ObjectType.UUID) => s"avroBinaryUuid($path)"
case _ => path
}
}

schemaVersion match {
case 2 => expressionV2(name, path, descriptor)
case 1 => expressionV0V1(name, path, descriptor)
case 0 => expressionV0V1(name, path, descriptor)
case v => throw new IllegalArgumentException(s"Unknown SimpleFeatureParquetSchema version: $v")
}
}
}

object ParquetConverterFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@ import org.locationtech.geomesa.convert.avro.AvroPath
import org.locationtech.geomesa.convert2.transforms.Expression.LiteralString
import org.locationtech.geomesa.convert2.transforms.TransformerFunction.NamedTransformerFunction
import org.locationtech.geomesa.convert2.transforms.{Expression, TransformerFunction, TransformerFunctionFactory}
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchema, SimpleFeatureReadSupport}
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchemaV1, SimpleFeatureReadSupport}
import org.locationtech.jts.geom._

/**
* For parsing geometries from a GeoParquet file, the GeometryFunctionFactory class provides equivalent functionality.
*
* This class is kept for backwards compatibility with older Parquet file formats.
*/
@Deprecated
class ParquetFunctionFactory extends TransformerFunctionFactory {

override def functions: Seq[TransformerFunction] = geometries
Expand All @@ -42,7 +48,7 @@ class ParquetFunctionFactory extends TransformerFunctionFactory {
abstract class ParquetGeometryFn[T <: Geometry, U](name: String, path: AvroPath)
extends NamedTransformerFunction(Seq(name), pure = true) {

import SimpleFeatureParquetSchema.{GeometryColumnX, GeometryColumnY}
import SimpleFeatureParquetSchemaV1.{GeometryColumnX, GeometryColumnY}

override def apply(args: Array[AnyRef]): AnyRef = {
path.eval(args(0).asInstanceOf[GenericRecord]).collect {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

package org.locationtech.geomesa.convert.parquet

import com.typesafe.config.ConfigFactory
import com.typesafe.config.{Config, ConfigFactory}
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.util.factory.Hints
import org.junit.runner.RunWith
import org.locationtech.geomesa.convert.EvaluationContext
Expand All @@ -33,6 +34,39 @@ class ParquetConverterTest extends Specification {
sequential

"ParquetConverter" should {
"parse a geoparquet file" in {
val conf = ConfigFactory.parseString(
"""
| {
| type = "parquet",
| id-field = "avroPath($0, '/__fid__')",
| fields = [
| { name = "name", transform = "avroPath($0, '/name')" },
| { name = "age", transform = "avroPath($0, '/age')" },
| { name = "dtg", transform = "avroPath($0, '/dtg')" },
| { name = "position", transform = "point(avroPath($0, '/position'))" },
| ]
| }
""".stripMargin)

val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*position:Point:srid=4326")

val file = getClass.getClassLoader.getResource("example-geo.parquet")
val path = new File(file.toURI).getAbsolutePath

val res = WithClose(SimpleFeatureConverter(sft, conf)) { converter =>
val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path))
WithClose(converter.process(file.openStream(), ec))(_.toList)
}

res must haveLength(3)
res.map(_.getID) mustEqual Seq("1", "2", "3")
res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third")
res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300)
res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date]))
res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point]))
}

"parse a parquet file" in {
val conf = ConfigFactory.parseString(
"""
Expand Down Expand Up @@ -68,6 +102,33 @@ class ParquetConverterTest extends Specification {
res.map(_.getAttribute("geom")) mustEqual Seq("POINT (-100.2365 23)", "POINT (40.232 -53.2356)", "POINT (3 -62.23)").map(FastConverter.convert(_, classOf[Point]))
}

"infer a converter from a geomesa geoparquet file" >> {
val file = getClass.getClassLoader.getResource("example-geo.parquet")
val path = new File(file.toURI).getAbsolutePath

val factory = new ParquetConverterFactory()
val inferred: Option[(SimpleFeatureType, Config)] = factory.infer(file.openStream(), path = Some(path))
inferred must beSome

val (sft, config) = inferred.get

sft.getAttributeDescriptors.asScala.map(_.getLocalName) mustEqual Seq("name", "age", "dtg", "position")
sft.getAttributeDescriptors.asScala.map(_.getType.getBinding) mustEqual
Seq(classOf[String], classOf[java.lang.Integer], classOf[Date], classOf[Point])

val res = WithClose(SimpleFeatureConverter(sft, config)) { converter =>
val ec = converter.createEvaluationContext(EvaluationContext.inputFileParam(path))
WithClose(converter.process(file.openStream(), ec))(_.toList)
}

res must haveLength(3)
res.map(_.getID) mustEqual Seq("1", "2", "3")
res.map(_.getAttribute("name")) mustEqual Seq("first", null, "third")
res.map(_.getAttribute("age")) mustEqual Seq(100, 200, 300)
res.map(_.getAttribute("dtg")) mustEqual Seq("2017-01-01", "2017-01-02", "2017-01-03").map(FastConverter.convert(_, classOf[Date]))
res.map(_.getAttribute("position")) mustEqual Seq("POINT (25.236263 27.436734)", "POINT (67.2363 55.236)", "POINT (73.0 73.0)").map(FastConverter.convert(_, classOf[Point]))
}

"infer a converter from a geomesa parquet file" >> {
val file = getClass.getClassLoader.getResource("example.parquet")
val path = new File(file.toURI).getAbsolutePath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.locationtech.geomesa.fs.storage.api.StorageMetadata._
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType
import org.locationtech.geomesa.fs.storage.common.utils.{PathCache, StorageUtils}
Expand Down Expand Up @@ -235,8 +235,8 @@ abstract class AbstractFileSystemStorage(
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fc, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
val observer = if (observers.isEmpty) { updateObserver.asInstanceOf[BoundsObserver] } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)).asInstanceOf[BoundsObserver]
}
WriterConfig(path, observer)
}
Expand Down Expand Up @@ -350,7 +350,10 @@ abstract class AbstractFileSystemStorage(
* @param file file being written
* @param action file type
*/
class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver {
class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver with BoundsObserver {

override def getBoundingBox: Envelope = super.getBoundingBox

override protected def onClose(bounds: Envelope, count: Long): Unit = {
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count))
Expand All @@ -370,7 +373,7 @@ object AbstractFileSystemStorage {
/**
* Tracks metadata during writes
*/
abstract class MetadataObserver extends FileSystemObserver {
abstract class MetadataObserver extends BoundsObserver {

private var count: Long = 0L
private val bounds: Envelope = new Envelope()
Expand All @@ -384,12 +387,14 @@ object AbstractFileSystemStorage {
}
}

def getBoundingBox: Envelope = bounds

override def flush(): Unit = {}

override def close(): Unit = onClose(bounds, count)

protected def onClose(bounds: Envelope, count: Long): Unit
}

private case class WriterConfig(path: Path, observer: FileSystemObserver)
private case class WriterConfig(path: Path, observer: BoundsObserver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@
package org.locationtech.geomesa.fs.storage.common.observer

import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.jts.geom.Envelope

/**
* Marker trait for writer hooks
*/
trait FileSystemObserver extends FileSystemWriter

trait BoundsObserver extends FileSystemObserver {
def getBoundingBox: Envelope
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.locationtech.geomesa.utils.io.{CloseQuietly, FlushQuietly}
import org.locationtech.jts.geom.Envelope

import java.io.Closeable

Expand Down Expand Up @@ -41,8 +42,9 @@ trait FileSystemObserverFactory extends Closeable {

object FileSystemObserverFactory {

object NoOpObserver extends FileSystemObserver {
object NoOpObserver extends BoundsObserver {
override def write(feature: SimpleFeature): Unit = {}
override def getBoundingBox: Envelope = new Envelope()
override def flush(): Unit = {}
override def close(): Unit = {}
}
Expand All @@ -52,8 +54,14 @@ object FileSystemObserverFactory {
*
* @param observers observers
*/
class CompositeObserver(observers: Seq[FileSystemObserver]) extends FileSystemObserver {
class CompositeObserver(observers: Seq[FileSystemObserver]) extends BoundsObserver {
override def write(feature: SimpleFeature): Unit = observers.foreach(_.write(feature))

// Get the bounding box for the UpdateObserver instance (the first one in the list)
override def getBoundingBox: Envelope = {
observers.head.asInstanceOf[BoundsObserver].getBoundingBox
}

override def flush(): Unit = FlushQuietly(observers).foreach(e => throw e)
override def close(): Unit = CloseQuietly(observers).foreach(e => throw e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.networknt</groupId>
<artifactId>json-schema-validator</artifactId>
<scope>test</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ import scala.reflect.ClassTag

object FilterConverter {

def convert(sft: SimpleFeatureType, filter: Filter): (Option[FilterPredicate], Option[Filter]) = {
if (filter == Filter.INCLUDE) { (None, None) } else {
FilterHelper.propertyNames(filter).foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft))
def convert(sft: SimpleFeatureType, filter: Filter): (Int => (Option[FilterPredicate], Option[Filter])) = {
if (filter == Filter.INCLUDE) { _ => (None, None) } else {
val propertyNames = FilterHelper.propertyNames(filter)
lazy val v1 = propertyNames.foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft, 1))
lazy val v2 = propertyNames.foldLeft((Option.empty[FilterPredicate], Option(filter)))(reduce(sft, 2))
i => if (i == 2) { v2 } else { v1 }
}
}

private def reduce(
sft: SimpleFeatureType
sft: SimpleFeatureType,
version: Int
)(result: (Option[FilterPredicate], Option[Filter]),
name: String): (Option[FilterPredicate], Option[Filter]) = {
val (parquet, geotools) = result
Expand All @@ -44,7 +48,12 @@ object FilterConverter {

val (predicate, remaining): (Option[FilterPredicate], Option[Filter]) = bindings.head match {
// note: non-points use repeated values, which aren't supported in parquet predicates
case ObjectType.GEOMETRY if bindings.last == ObjectType.POINT => spatial(sft, name, filter, col)
case ObjectType.GEOMETRY if bindings.last == ObjectType.POINT => {
version match {
case 2 => spatial(filter)
case _ => spatialV0V1(sft, name, filter, col)
}
}
case ObjectType.DATE => temporal(sft, name, filter, FilterApi.longColumn(col))
case ObjectType.STRING => attribute(sft, name, filter, FilterApi.binaryColumn(col), Binary.fromString)
case ObjectType.INT => attribute(sft, name, filter, FilterApi.intColumn(col), identity[java.lang.Integer])
Expand All @@ -58,7 +67,10 @@ object FilterConverter {
((predicate.toSeq ++ parquet).reduceLeftOption(FilterApi.and), remaining)
}

private def spatial(
private def spatial(filter: Filter): (Option[FilterPredicate], Option[Filter]) = (None, Some(filter))

// Backwards-compatible method for old parquet files
private def spatialV0V1(
sft: SimpleFeatureType,
name: String,
filter: Filter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@
package org.locationtech.geomesa.fs.storage.parquet

import com.typesafe.scalalogging.LazyLogging
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.NoOpObserver
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly

Expand Down Expand Up @@ -102,9 +102,9 @@ object ParquetFileSystemStorage extends LazyLogging {
// Process the record
record = reader.read()
}
logger.debug(s"${file} is a valid Parquet file")
logger.debug(s"'$file' is a valid Parquet file")
} catch {
case e: Exception => throw new RuntimeException(s"Unable to validate ${file}: File may be corrupted", e)
case e: Exception => throw new RuntimeException(s"Unable to validate '$file': File may be corrupted", e)
} finally {
reader.close()
}
Expand Down
Loading

0 comments on commit 4af7d90

Please sign in to comment.