Skip to content

Commit

Permalink
Callback function for adding bounds to storage metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
adeet1 committed Jun 6, 2024
1 parent 4af7d90 commit a777517
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.{FileSystemUpda
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata._
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.{CompositeObserver, NoOpObserver}
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory}
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType
import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType
Expand Down Expand Up @@ -67,11 +67,13 @@ abstract class AbstractFileSystemStorage(
/**
* Create a writer for the given file
*
* @param partition the partition that the file belongs to
* @param action whether to append or modify
* @param file file to write to
* @param observer observer to report stats on the data written
* @return
*/
protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter
protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter

/**
* Create a path reader with the given filter and transform
Expand Down Expand Up @@ -234,11 +236,11 @@ abstract class AbstractFileSystemStorage(
def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
PathCache.register(context.fc, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver.asInstanceOf[BoundsObserver] } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)).asInstanceOf[BoundsObserver]
val noopObserver = NoOpObserver
val observer = if (observers.isEmpty) { noopObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(noopObserver)).asInstanceOf[BoundsObserver]
}
WriterConfig(path, observer)
WriterConfig(partition, action, path, observer)
}

targetSize(targetFileSize) match {
Expand All @@ -247,7 +249,7 @@ abstract class AbstractFileSystemStorage(
}
}

private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.path, config.observer)
private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.partition, config.action, config.path, config.observer)

/**
* Writes files up to a given size, then starts a new file
Expand Down Expand Up @@ -350,13 +352,10 @@ abstract class AbstractFileSystemStorage(
* @param file file being written
* @param action file type
*/
class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver with BoundsObserver {

override def getBoundingBox: Envelope = super.getBoundingBox

override protected def onClose(bounds: Envelope, count: Long): Unit = {
protected class StorageMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) {
override def apply(env: Envelope, count: Long): Unit = {
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count))
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count))
}
}
}
Expand Down Expand Up @@ -396,5 +395,5 @@ object AbstractFileSystemStorage {
protected def onClose(bounds: Envelope, count: Long): Unit
}

private case class WriterConfig(path: Path, observer: BoundsObserver)
private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: BoundsObserver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert2.SimpleFeatureConverter
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{StorageFile, StorageFilePath}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
Expand All @@ -30,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
// actually need to be closed, and since they will only open a single connection per converter, the
// impact should be low

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter =
throw new NotImplementedError()

override protected def createReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver}
import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver
import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver}
import org.locationtech.geomesa.utils.geotools.ObjectType
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.geom.{Envelope, Geometry}

/**
* Orc implementation of FileSystemStorage
Expand All @@ -32,8 +34,14 @@ import org.locationtech.jts.geom.Geometry
class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter =
new OrcFileSystemWriter(metadata.sft, context.conf, file, observer)
private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver with BoundsObserver {
override protected def onClose(bounds: Envelope, count: Long): Unit = new StorageMetadataCallback(partition, action, file)(bounds, count)
}

override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
val compositeObserver = new CompositeObserver(Seq(new SingleGeometryObserver(partition, action, file), observer))
new OrcFileSystemWriter(metadata.sft, context.conf, file, compositeObserver)
}

override protected def createReader(
filter: Option[Filter],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.factory.FastFilterFactory
import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader
import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration
Expand All @@ -26,6 +28,7 @@ import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFac
import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled}
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter
import org.locationtech.geomesa.utils.io.CloseQuietly
import org.locationtech.jts.geom.Envelope

/**
*
Expand All @@ -35,10 +38,10 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata)
extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) {

override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = {
override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = {
val sftConf = new Configuration(context.conf)
StorageConfiguration.setSft(sftConf, metadata.sft)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer)
new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new StorageMetadataCallback(partition, action, file))
}

override protected def createReader(
Expand Down Expand Up @@ -74,10 +77,11 @@ object ParquetFileSystemStorage extends LazyLogging {
sft: SimpleFeatureType,
file: Path,
conf: Configuration,
observer: FileSystemObserver = NoOpObserver
observer: FileSystemObserver = NoOpObserver,
callback: (Envelope, Long) => Unit = ((_, _) => {})
) extends FileSystemWriter {

private val writer = SimpleFeatureParquetWriter.builder(file, conf).build()
private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build()

override def write(f: SimpleFeature): Unit = {
writer.write(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}
import org.geotools.api.feature.simple.SimpleFeature
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureWriteSupport
import org.locationtech.jts.geom.Envelope

object SimpleFeatureParquetWriter extends LazyLogging {

def builder(file: Path, conf: Configuration): Builder = {
def builder(file: Path, conf: Configuration, callback: (Envelope, Long) => Unit = ((_, _) => {})): Builder = {
val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "SNAPPY"))
logger.debug(s"Using Parquet Compression codec ${codec.name()}")
new Builder(file)
new Builder(file, callback)
.withConf(conf)
.withCompressionCodec(codec)
.withDictionaryEncoding(true)
Expand All @@ -36,10 +37,10 @@ object SimpleFeatureParquetWriter extends LazyLogging {
.withRowGroupSize(8*1024*1024)
}

class Builder private [SimpleFeatureParquetWriter] (file: Path)
class Builder private [SimpleFeatureParquetWriter] (file: Path, callback: (Envelope, Long) => Unit)
extends ParquetWriter.Builder[SimpleFeature, Builder](file) {
override def self(): Builder = this
override protected def getWriteSupport(conf: Configuration): WriteSupport[SimpleFeature] =
new SimpleFeatureWriteSupport
new SimpleFeatureWriteSupport(callback)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import java.nio.ByteBuffer
import java.util.{Date, UUID}
import scala.collection.JavaConverters._

class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => {})) extends WriteSupport[SimpleFeature] {

private class MultipleGeometriesObserver extends MetadataObserver {
private var count: Long = 0L
private var numGeoms: Int = 0

// Number of geometries in the file
private var numGeoms: Int = 0 // number of geometries in the file
private var bounds: Array[Envelope] = new Array[Envelope](0)

override def write(feature: SimpleFeature): Unit = {
Expand All @@ -57,17 +55,31 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {

def getBoundingBoxes: Array[Envelope] = bounds

override protected def onClose(bounds: Envelope, count: Long): Unit = {}
override def close(): Unit = {
// Merge all the envelopes into one
val mergedBounds = new Envelope()
for (b <- bounds) {
mergedBounds.expandToInclude(b)
}

onClose(mergedBounds, count)
}

// Invokes the callback function that adds metadata to the storage partition
override protected def onClose(bounds: Envelope, count: Long): Unit = callback(bounds, count)
}

private val observer = new MultipleGeometriesObserver

private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _
private var consumer: RecordConsumer = _
private var schema: SimpleFeatureParquetSchema = _

override val getName: String = "SimpleFeatureWriteSupport"

// Need a no-arg constructor because Apache Parquet can't instantiate the callback arg for the MapReduce compaction job
// Also, the compaction job doesn't write or calculate bounds anyway
def this() = this( (_, _) => {} )

// called once
override def init(conf: Configuration): WriteContext = {
schema = SimpleFeatureParquetSchema.write(conf).getOrElse {
Expand All @@ -82,6 +94,7 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] {
override def finalizeWrite(): FinalizedWriteContext = {
// Get the bounding boxes that span each geometry type
val bboxes = observer.getBoundingBoxes
observer.close()

// If the SFT has no geometries, then there's no need to create GeoParquet metadata
if (bboxes.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
// 8 bits resolution creates 3 partitions with our test data
val scheme = NamedOptions("z2-8bits")

// TODO: implement a unit test to check if partition bounds in the storage metadata are correct
"ParquetFileSystemStorage" should {
"read and write features" in {
val sft = SimpleFeatureTypes.createType("parquet-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,20 @@ class CompactCommandTest extends Specification {
fs.getCount(Query.ALL) mustEqual numFeatures

val partitions = ds.storage(sft.getTypeName).metadata.getPartitions()
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName)
filePaths.foreach(path => {
val filepath = path.path
if (encoding == "parquet") {

// For parquet files, get bounding boxes from each file in each partition
if (encoding == "parquet") {
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName)
filePaths.foreach(path => {
val filepath = path.path
val bbox = getBoundingBoxFromGeoParquetFile(filepath)
partitionBoundingBoxes.addBinding(partitionName, bbox)
}
})
})
})
}

// TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length??
partitions.map(_.files.size) mustEqual Seq.fill(10)(numFilesPerPartition)
}
}
Expand Down Expand Up @@ -198,22 +199,23 @@ class CompactCommandTest extends Specification {
fs.getCount(Query.ALL) mustEqual numFeatures

val partitions = ds.storage(sft.getTypeName).metadata.getPartitions()
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path)
filePaths.foreach(path => {
if (encoding == "parquet") {
// In each partition, assert that the union of bounding boxes of the 2 files before compaction
// is the same as the bounding box of the 1 file after compaction
val bboxesUnion = new Envelope
partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox))
val metadataBbox = getBoundingBoxFromGeoParquetFile(path)
bboxesUnion mustEqual metadataBbox
}

// For parquet files, check that the union of bounding boxes of the 2 files before
// compaction is the same as the bounding box of the 1 file after compaction
if (encoding == "parquet") {
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path)
filePaths.foreach(path => {
// In each partition, assert that the
val bboxesUnion = new Envelope
partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox))
val metadataBbox = getBoundingBoxFromGeoParquetFile(path)
bboxesUnion mustEqual metadataBbox
})
})
})
}

// TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length??
partitions.map(_.files.size) mustEqual Seq.fill(10)(1)
}
}
Expand Down

0 comments on commit a777517

Please sign in to comment.