From a7775172b56fa28c6a40cf56ad08d1ef638573ef Mon Sep 17 00:00:00 2001 From: adeet1 Date: Fri, 31 May 2024 18:22:07 +0000 Subject: [PATCH] Callback function for adding bounds to storage metadata --- .../common/AbstractFileSystemStorage.scala | 29 ++++++----- .../storage/converter/ConverterStorage.scala | 3 +- .../fs/storage/orc/OrcFileSystemStorage.scala | 18 +++++-- .../parquet/ParquetFileSystemStorage.scala | 12 +++-- .../parquet/SimpleFeatureParquetWriter.scala | 9 ++-- .../io/SimpleFeatureWriteSupport.scala | 25 +++++++--- .../geomesa/parquet/ParquetStorageTest.scala | 1 + .../fs/tools/ingest/CompactCommandTest.scala | 48 ++++++++++--------- 8 files changed, 87 insertions(+), 58 deletions(-) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala index f14a551a6d44..47a2988af7bc 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala @@ -19,8 +19,8 @@ import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.{FileSystemUpda import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api.StorageMetadata._ import org.locationtech.geomesa.fs.storage.api._ -import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver, WriterConfig} -import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver +import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, WriterConfig} +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.{CompositeObserver, NoOpObserver} import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver, FileSystemObserverFactory} import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType import org.locationtech.geomesa.fs.storage.common.utils.StorageUtils.FileType.FileType @@ -67,11 +67,13 @@ abstract class AbstractFileSystemStorage( /** * Create a writer for the given file * + * @param partition the partition that the file belongs to + * @param action whether to append or modify * @param file file to write to * @param observer observer to report stats on the data written * @return */ - protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter + protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter /** * Create a path reader with the given filter and transform @@ -234,11 +236,11 @@ abstract class AbstractFileSystemStorage( def pathAndObserver: WriterConfig = { val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType) PathCache.register(context.fc, path) - val updateObserver = new UpdateObserver(partition, path, action) - val observer = if (observers.isEmpty) { updateObserver.asInstanceOf[BoundsObserver] } else { - new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver)).asInstanceOf[BoundsObserver] + val noopObserver = NoOpObserver + val observer = if (observers.isEmpty) { noopObserver } else { + new CompositeObserver(observers.map(_.apply(path)).+:(noopObserver)).asInstanceOf[BoundsObserver] } - WriterConfig(path, observer) + WriterConfig(partition, action, path, observer) } targetSize(targetFileSize) match { @@ -247,7 +249,7 @@ abstract class AbstractFileSystemStorage( } } - private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.path, config.observer) + private def createWriter(config: WriterConfig): FileSystemWriter = createWriter(config.partition, config.action, config.path, config.observer) /** * Writes files up to a given size, then starts a new file @@ -350,13 +352,10 @@ abstract class AbstractFileSystemStorage( * @param file file being written * @param action file type */ - class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver with BoundsObserver { - - override def getBoundingBox: Envelope = super.getBoundingBox - - override protected def onClose(bounds: Envelope, count: Long): Unit = { + protected class StorageMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) { + override def apply(env: Envelope, count: Long): Unit = { val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action)) - metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count)) + metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count)) } } } @@ -396,5 +395,5 @@ object AbstractFileSystemStorage { protected def onClose(bounds: Envelope, count: Long): Unit } - private case class WriterConfig(path: Path, observer: BoundsObserver) + private case class WriterConfig(partition: String, action: StorageFileAction, path: Path, observer: BoundsObserver) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 4e92c5fddb80..59da23b8610d 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -13,6 +13,7 @@ import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{StorageFile, StorageFilePath} import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage @@ -30,7 +31,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co // actually need to be closed, and since they will only open a single connection per converter, the // impact should be low - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = throw new NotImplementedError() override protected def createReader( diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala index a69b92690162..e2756613501e 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/main/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorage.scala @@ -16,13 +16,15 @@ import org.geotools.api.feature.simple.SimpleFeatureType import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.factory.FastFilterFactory import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage -import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader -import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver +import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.{FileSystemPathReader, MetadataObserver} +import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFactory.CompositeObserver +import org.locationtech.geomesa.fs.storage.common.observer.{BoundsObserver, FileSystemObserver} import org.locationtech.geomesa.utils.geotools.ObjectType import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType -import org.locationtech.jts.geom.Geometry +import org.locationtech.jts.geom.{Envelope, Geometry} /** * Orc implementation of FileSystemStorage @@ -32,8 +34,14 @@ import org.locationtech.jts.geom.Geometry class OrcFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, OrcFileSystemStorage.FileExtension) { - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = - new OrcFileSystemWriter(metadata.sft, context.conf, file, observer) + private class SingleGeometryObserver(partition: String, action: StorageFileAction, file: Path) extends MetadataObserver with BoundsObserver { + override protected def onClose(bounds: Envelope, count: Long): Unit = new StorageMetadataCallback(partition, action, file)(bounds, count) + } + + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = { + val compositeObserver = new CompositeObserver(Seq(new SingleGeometryObserver(partition, action, file), observer)) + new OrcFileSystemWriter(metadata.sft, context.conf, file, compositeObserver) + } override protected def createReader( filter: Option[Filter], diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala index 5a9e1483abcb..6ee67c9f97ae 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/ParquetFileSystemStorage.scala @@ -18,6 +18,8 @@ import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.filter.factory.FastFilterFactory import org.locationtech.geomesa.fs.storage.api.FileSystemStorage.FileSystemWriter +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.StorageFileAction.StorageFileAction +import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile} import org.locationtech.geomesa.fs.storage.api._ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader import org.locationtech.geomesa.fs.storage.common.jobs.StorageConfiguration @@ -26,6 +28,7 @@ import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserverFac import org.locationtech.geomesa.fs.storage.common.{AbstractFileSystemStorage, FileValidationEnabled} import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.ParquetFileSystemWriter import org.locationtech.geomesa.utils.io.CloseQuietly +import org.locationtech.jts.geom.Envelope /** * @@ -35,10 +38,10 @@ import org.locationtech.geomesa.utils.io.CloseQuietly class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMetadata) extends AbstractFileSystemStorage(context, metadata, ParquetFileSystemStorage.FileExtension) { - override protected def createWriter(file: Path, observer: FileSystemObserver): FileSystemWriter = { + override protected def createWriter(partition: String, action: StorageFileAction, file: Path, observer: FileSystemObserver): FileSystemWriter = { val sftConf = new Configuration(context.conf) StorageConfiguration.setSft(sftConf, metadata.sft) - new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer) + new ParquetFileSystemWriter(metadata.sft, file, sftConf, observer, new StorageMetadataCallback(partition, action, file)) } override protected def createReader( @@ -74,10 +77,11 @@ object ParquetFileSystemStorage extends LazyLogging { sft: SimpleFeatureType, file: Path, conf: Configuration, - observer: FileSystemObserver = NoOpObserver + observer: FileSystemObserver = NoOpObserver, + callback: (Envelope, Long) => Unit = ((_, _) => {}) ) extends FileSystemWriter { - private val writer = SimpleFeatureParquetWriter.builder(file, conf).build() + private val writer = SimpleFeatureParquetWriter.builder(file, conf, callback).build() override def write(f: SimpleFeature): Unit = { writer.write(f) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala index adfa0325d3f0..79588db8c8e9 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/SimpleFeatureParquetWriter.scala @@ -17,13 +17,14 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter} import org.geotools.api.feature.simple.SimpleFeature import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureWriteSupport +import org.locationtech.jts.geom.Envelope object SimpleFeatureParquetWriter extends LazyLogging { - def builder(file: Path, conf: Configuration): Builder = { + def builder(file: Path, conf: Configuration, callback: (Envelope, Long) => Unit = ((_, _) => {})): Builder = { val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "SNAPPY")) logger.debug(s"Using Parquet Compression codec ${codec.name()}") - new Builder(file) + new Builder(file, callback) .withConf(conf) .withCompressionCodec(codec) .withDictionaryEncoding(true) @@ -36,10 +37,10 @@ object SimpleFeatureParquetWriter extends LazyLogging { .withRowGroupSize(8*1024*1024) } - class Builder private [SimpleFeatureParquetWriter] (file: Path) + class Builder private [SimpleFeatureParquetWriter] (file: Path, callback: (Envelope, Long) => Unit) extends ParquetWriter.Builder[SimpleFeature, Builder](file) { override def self(): Builder = this override protected def getWriteSupport(conf: Configuration): WriteSupport[SimpleFeature] = - new SimpleFeatureWriteSupport + new SimpleFeatureWriteSupport(callback) } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala index d2ff8a056322..2cadb42af81f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureWriteSupport.scala @@ -26,13 +26,11 @@ import java.nio.ByteBuffer import java.util.{Date, UUID} import scala.collection.JavaConverters._ -class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { +class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) => {})) extends WriteSupport[SimpleFeature] { private class MultipleGeometriesObserver extends MetadataObserver { private var count: Long = 0L - private var numGeoms: Int = 0 - - // Number of geometries in the file + private var numGeoms: Int = 0 // number of geometries in the file private var bounds: Array[Envelope] = new Array[Envelope](0) override def write(feature: SimpleFeature): Unit = { @@ -57,17 +55,31 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { def getBoundingBoxes: Array[Envelope] = bounds - override protected def onClose(bounds: Envelope, count: Long): Unit = {} + override def close(): Unit = { + // Merge all the envelopes into one + val mergedBounds = new Envelope() + for (b <- bounds) { + mergedBounds.expandToInclude(b) + } + + onClose(mergedBounds, count) + } + + // Invokes the callback function that adds metadata to the storage partition + override protected def onClose(bounds: Envelope, count: Long): Unit = callback(bounds, count) } private val observer = new MultipleGeometriesObserver - private var writer: SimpleFeatureWriteSupport.SimpleFeatureWriter = _ private var consumer: RecordConsumer = _ private var schema: SimpleFeatureParquetSchema = _ override val getName: String = "SimpleFeatureWriteSupport" + // Need a no-arg constructor because Apache Parquet can't instantiate the callback arg for the MapReduce compaction job + // Also, the compaction job doesn't write or calculate bounds anyway + def this() = this( (_, _) => {} ) + // called once override def init(conf: Configuration): WriteContext = { schema = SimpleFeatureParquetSchema.write(conf).getOrElse { @@ -82,6 +94,7 @@ class SimpleFeatureWriteSupport extends WriteSupport[SimpleFeature] { override def finalizeWrite(): FinalizedWriteContext = { // Get the bounding boxes that span each geometry type val bboxes = observer.getBoundingBoxes + observer.close() // If the SFT has no geometries, then there's no need to create GeoParquet metadata if (bboxes.isEmpty) { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala index b4c11f2b787d..0e3ae5fc150b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala @@ -47,6 +47,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog // 8 bits resolution creates 3 partitions with our test data val scheme = NamedOptions("z2-8bits") + // TODO: implement a unit test to check if partition bounds in the storage metadata are correct "ParquetFileSystemStorage" should { "read and write features" in { val sft = SimpleFeatureTypes.createType("parquet-test", "*geom:Point:srid=4326,name:String,age:Int,dtg:Date") diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala index 02e9801dab28..63faeec862dd 100644 --- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala +++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala @@ -153,19 +153,20 @@ class CompactCommandTest extends Specification { fs.getCount(Query.ALL) mustEqual numFeatures val partitions = ds.storage(sft.getTypeName).metadata.getPartitions() - val partitionNames = partitions.map(_.name) - partitionNames.foreach(partitionName => { - val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName) - filePaths.foreach(path => { - val filepath = path.path - if (encoding == "parquet") { + + // For parquet files, get bounding boxes from each file in each partition + if (encoding == "parquet") { + val partitionNames = partitions.map(_.name) + partitionNames.foreach(partitionName => { + val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName) + filePaths.foreach(path => { + val filepath = path.path val bbox = getBoundingBoxFromGeoParquetFile(filepath) partitionBoundingBoxes.addBinding(partitionName, bbox) - } + }) }) - }) + } - // TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length?? partitions.map(_.files.size) mustEqual Seq.fill(10)(numFilesPerPartition) } } @@ -198,22 +199,23 @@ class CompactCommandTest extends Specification { fs.getCount(Query.ALL) mustEqual numFeatures val partitions = ds.storage(sft.getTypeName).metadata.getPartitions() - val partitionNames = partitions.map(_.name) - partitionNames.foreach(partitionName => { - val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path) - filePaths.foreach(path => { - if (encoding == "parquet") { - // In each partition, assert that the union of bounding boxes of the 2 files before compaction - // is the same as the bounding box of the 1 file after compaction - val bboxesUnion = new Envelope - partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox)) - val metadataBbox = getBoundingBoxFromGeoParquetFile(path) - bboxesUnion mustEqual metadataBbox - } + + // For parquet files, check that the union of bounding boxes of the 2 files before + // compaction is the same as the bounding box of the 1 file after compaction + if (encoding == "parquet") { + val partitionNames = partitions.map(_.name) + partitionNames.foreach(partitionName => { + val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path) + filePaths.foreach(path => { + // In each partition, assert that the + val bboxesUnion = new Envelope + partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox)) + val metadataBbox = getBoundingBoxFromGeoParquetFile(path) + bboxesUnion mustEqual metadataBbox + }) }) - }) + } - // TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length?? partitions.map(_.files.size) mustEqual Seq.fill(10)(1) } }