From 480db3c622d4e5d8f89dee4537dbaeebdb77c757 Mon Sep 17 00:00:00 2001 From: Luca Rossetto Date: Fri, 18 Oct 2024 13:06:38 +0200 Subject: [PATCH 01/12] First sketch of a video decoder using Jaffree --- gradle.properties | 1 + vitrivr-engine-index/build.gradle | 3 + .../engine/index/decode/FFmpegVideoDecoder.kt | 225 ++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt diff --git a/gradle.properties b/gradle.properties index 40278fbf..0544b58d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,6 +3,7 @@ version_caffeine=3.1.8 version_clikt=4.2.2 version_commonsmath3=3.6.1 version_cottontaildb=0.16.7 +version_jaffree=2024.08.29 version_javacv=1.5.10 version_javalin=6.3.0 version_jdbc_postgres=42.7.4 diff --git a/vitrivr-engine-index/build.gradle b/vitrivr-engine-index/build.gradle index fa452381..d15b08a0 100644 --- a/vitrivr-engine-index/build.gradle +++ b/vitrivr-engine-index/build.gradle @@ -28,6 +28,9 @@ dependencies { implementation group: 'org.bytedeco', name: 'javacv', version: version_javacv implementation group: 'org.bytedeco', name: 'ffmpeg', version: version_ffmpeg, classifier: project.ext.javacppPlatform + /** Jaffree for external ffmpeg*/ + implementation group: 'com.github.kokorin.jaffree', name: 'jaffree', version: version_jaffree + /** ScrImage (used for image resizing). */ implementation group: 'com.sksamuel.scrimage', name: 'scrimage-core', version: version_scrimage diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt new file mode 100644 index 00000000..240f0093 --- /dev/null +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -0,0 +1,225 @@ +package org.vitrivr.engine.index.decode + +import com.github.kokorin.jaffree.ffmpeg.* +import com.github.kokorin.jaffree.ffprobe.FFprobe +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS +import kotlinx.coroutines.channels.ProducerScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.withContext +import org.bytedeco.javacv.FrameGrabber +import org.vitrivr.engine.core.context.IndexContext +import org.vitrivr.engine.core.model.relationship.Relationship +import org.vitrivr.engine.core.model.retrievable.Ingested +import org.vitrivr.engine.core.model.retrievable.Retrievable +import org.vitrivr.engine.core.model.retrievable.attributes.ContentAuthorAttribute +import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute +import org.vitrivr.engine.core.model.retrievable.attributes.time.TimeRangeAttribute +import org.vitrivr.engine.core.operators.Operator +import org.vitrivr.engine.core.operators.ingest.Decoder +import org.vitrivr.engine.core.operators.ingest.DecoderFactory +import org.vitrivr.engine.core.operators.ingest.Enumerator +import org.vitrivr.engine.core.source.MediaType +import org.vitrivr.engine.core.source.Source +import org.vitrivr.engine.core.source.file.FileSource +import java.awt.image.BufferedImage +import java.nio.ShortBuffer +import java.nio.file.Path +import java.util.* +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +class FFmpegVideoDecoder : DecoderFactory { + + override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder { + val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L + val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) } + + return Instance(input, context, timeWindowMs, name, ffmpegPath) + } + + private class Instance( + override val input: Enumerator, + private val context: IndexContext, + private val timeWindowMs: Long = 500L, + private val name: String, + private val ffmpegPath: Path? + ) : Decoder { + + /** [KLogger] instance. */ + private val logger: KLogger = KotlinLogging.logger {} + +// private val ffprobe: FFprobe +// get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath() + + private val ffmpeg: FFmpeg + get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath() + + override fun toFlow(scope: CoroutineScope): Flow = channelFlow { + this@Instance.input.toFlow(scope).collect { sourceRetrievable -> + /* Extract source. */ + val source = sourceRetrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@collect + if (source.type != MediaType.VIDEO) { + logger.debug { "In flow: Skipping source ${source.name} (${source.sourceId}) because it is not of type VIDEO." } + return@collect + } + + var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) + + val imageBuffer = LinkedBlockingQueue>() + + val ffmpegInstance = ffmpeg.addInput( + if (source is FileSource) { + UrlInput.fromPath(source.path) + } else { + PipeInput.pumpFrom(source.newInputStream()) + } + ).addOutput( + FrameOutput.withConsumerAlpha( + object : FrameConsumer { + + val streamMap = mutableMapOf() + + override fun consumeStreams(streams: MutableList) { + streams.forEach { stream -> streamMap[stream.id] = stream } + } + + override fun consume(frame: Frame) { + + val stream = streamMap[frame.streamId] ?: return + + when (stream.type) { + Stream.Type.VIDEO -> { + imageBuffer.add(frame.image!! to (1000 * frame.pts) / stream.timebase) + } + + Stream.Type.AUDIO -> { + //TODO + } + + null -> { + /* ignore */ + } + } + } + + } + ) + ) + + //TODO scaling + //TODO audio settings + + val future = ffmpegInstance.executeAsync() + + while (!future.isDone || !future.isCancelled) { + + if (imageBuffer.isNotEmpty() && imageBuffer.last().second >= windowEnd) { + emit( + imageBuffer, windowEnd, sourceRetrievable, this@channelFlow + ) + windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) + } else { + Thread.yield() + } + + } + + if (imageBuffer.isNotEmpty()) { + emit(imageBuffer, windowEnd, sourceRetrievable, this@channelFlow) + } + + } + }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) + + + /** + * Emits a single [Retrievable] to the downstream [channel]. + * + * @param imageBuffer A [LinkedList] containing [BufferedImage] elements to emit (frames). + * @param audioBuffer The [LinkedList] containing the [ShortBuffer] elements to emit (audio samples). + * @param grabber The [FrameGrabber] instance. + * @param timestampEnd The end timestamp. + * @param source The source [Retrievable] the emitted [Retrievable] is part of. + */ + private suspend fun emit( + imageBuffer: LinkedBlockingQueue>, + //audioBuffer: LinkedList>, + timestampEnd: Long, + source: Retrievable, + channel: ProducerScope + ) { + /* Audio samples. */ + var audioSize = 0 + val emitImage = mutableListOf() + //val emitAudio = mutableListOf() + + /* Drain buffers. */ + imageBuffer.removeIf { + if (it.second <= timestampEnd) { + emitImage.add(it.first) + true + } else { + false + } + } +// audioBuffer.removeIf { +// if (it.second <= timestampEnd) { +// audioSize += it.first.limit() +// emitAudio.add(it.first) +// true +// } else { +// false +// } +// } + + /* Prepare ingested with relationship to source. */ + val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false) + source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) } + ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false)) + ingested.addAttribute( + TimeRangeAttribute( + timestampEnd - TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs), + timestampEnd, + TimeUnit.MICROSECONDS + ) + ) + +// /* Prepare and append audio content element. */ +// if (emitAudio.size > 0) { +// val samples = ShortBuffer.allocate(audioSize) +// for (frame in emitAudio) { +// frame.clear() +// samples.put(frame) +// } +// samples.clear() +// val audio = this.context.contentFactory.newAudioContent( +// grabber.audioChannels.toShort(), +// grabber.sampleRate, +// samples +// ) +// ingested.addContent(audio) +// ingested.addAttribute(ContentAuthorAttribute(audio.id, name)) +// } + + /* Prepare and append image content element. */ + for (image in emitImage) { + val imageContent = this.context.contentFactory.newImageContent(image) + ingested.addContent(imageContent) + ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name)) + } + + //logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images and ${emitAudio.size} audio samples: ${ingested.id}" } + + /* Emit ingested. */ + channel.send(ingested) + } + + } +} \ No newline at end of file From ee71d822edfeca55d58a1d38c4e191576c62b06b Mon Sep 17 00:00:00 2001 From: Luca Rossetto Date: Sat, 19 Oct 2024 21:22:31 +0200 Subject: [PATCH 02/12] Resolved some synchronization issues --- .../engine/index/decode/FFmpegVideoDecoder.kt | 48 ++++++++++--------- ...ngine.core.operators.ingest.DecoderFactory | 1 + vitrivr-engine-server/build.gradle | 1 + 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index 240f0093..558baa31 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -1,18 +1,16 @@ package org.vitrivr.engine.index.decode +import com.github.kokorin.jaffree.StreamType import com.github.kokorin.jaffree.ffmpeg.* -import com.github.kokorin.jaffree.ffprobe.FFprobe import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.withContext import org.bytedeco.javacv.FrameGrabber import org.vitrivr.engine.core.context.IndexContext import org.vitrivr.engine.core.model.relationship.Relationship @@ -21,12 +19,10 @@ import org.vitrivr.engine.core.model.retrievable.Retrievable import org.vitrivr.engine.core.model.retrievable.attributes.ContentAuthorAttribute import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute import org.vitrivr.engine.core.model.retrievable.attributes.time.TimeRangeAttribute -import org.vitrivr.engine.core.operators.Operator import org.vitrivr.engine.core.operators.ingest.Decoder import org.vitrivr.engine.core.operators.ingest.DecoderFactory import org.vitrivr.engine.core.operators.ingest.Enumerator import org.vitrivr.engine.core.source.MediaType -import org.vitrivr.engine.core.source.Source import org.vitrivr.engine.core.source.file.FileSource import java.awt.image.BufferedImage import java.nio.ShortBuffer @@ -38,16 +34,20 @@ import java.util.concurrent.TimeUnit class FFmpegVideoDecoder : DecoderFactory { override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder { + val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840 + val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160 val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) } - return Instance(input, context, timeWindowMs, name, ffmpegPath) + return Instance(input, context, timeWindowMs, maxWidth, maxHeight, name, ffmpegPath) } private class Instance( override val input: Enumerator, private val context: IndexContext, private val timeWindowMs: Long = 500L, + private val maxWidth: Int, + private val maxHeight: Int, private val name: String, private val ffmpegPath: Path? ) : Decoder { @@ -61,7 +61,7 @@ class FFmpegVideoDecoder : DecoderFactory { private val ffmpeg: FFmpeg get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath() - override fun toFlow(scope: CoroutineScope): Flow = channelFlow { + override fun toFlow(scope: CoroutineScope): Flow = channelFlow { this@Instance.input.toFlow(scope).collect { sourceRetrievable -> /* Extract source. */ val source = sourceRetrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@collect @@ -72,7 +72,7 @@ class FFmpegVideoDecoder : DecoderFactory { var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) - val imageBuffer = LinkedBlockingQueue>() + val imageTransferBuffer = LinkedBlockingQueue>(10) val ffmpegInstance = ffmpeg.addInput( if (source is FileSource) { @@ -96,7 +96,7 @@ class FFmpegVideoDecoder : DecoderFactory { when (stream.type) { Stream.Type.VIDEO -> { - imageBuffer.add(frame.image!! to (1000 * frame.pts) / stream.timebase) + imageTransferBuffer.put(frame.image!! to (1000000 * frame.pts) / stream.timebase) } Stream.Type.AUDIO -> { @@ -111,30 +111,33 @@ class FFmpegVideoDecoder : DecoderFactory { } ) - ) + ).setFilter(StreamType.VIDEO, "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease") - //TODO scaling //TODO audio settings val future = ffmpegInstance.executeAsync() - while (!future.isDone || !future.isCancelled) { + val localImageBuffer = LinkedList>() + + while (!(future.isDone || future.isCancelled) || imageTransferBuffer.isNotEmpty()) { + + val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?:continue + localImageBuffer.add(next) - if (imageBuffer.isNotEmpty() && imageBuffer.last().second >= windowEnd) { - emit( - imageBuffer, windowEnd, sourceRetrievable, this@channelFlow - ) + if (localImageBuffer.last().second >= windowEnd) { + emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow) windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) - } else { - Thread.yield() } } - if (imageBuffer.isNotEmpty()) { - emit(imageBuffer, windowEnd, sourceRetrievable, this@channelFlow) + while (localImageBuffer.isNotEmpty()) { + emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow) + windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) } + send(sourceRetrievable) + } }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) @@ -149,14 +152,14 @@ class FFmpegVideoDecoder : DecoderFactory { * @param source The source [Retrievable] the emitted [Retrievable] is part of. */ private suspend fun emit( - imageBuffer: LinkedBlockingQueue>, + imageBuffer: MutableList>, //audioBuffer: LinkedList>, timestampEnd: Long, source: Retrievable, channel: ProducerScope ) { /* Audio samples. */ - var audioSize = 0 + //var audioSize = 0 val emitImage = mutableListOf() //val emitAudio = mutableListOf() @@ -169,6 +172,7 @@ class FFmpegVideoDecoder : DecoderFactory { false } } + // audioBuffer.removeIf { // if (it.second <= timestampEnd) { // audioSize += it.first.limit() diff --git a/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.ingest.DecoderFactory b/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.ingest.DecoderFactory index 78e19511..6f25574a 100644 --- a/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.ingest.DecoderFactory +++ b/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.ingest.DecoderFactory @@ -1,2 +1,3 @@ org.vitrivr.engine.index.decode.VideoDecoder +org.vitrivr.engine.index.decode.FFmpegVideoDecoder org.vitrivr.engine.index.decode.ImageDecoder \ No newline at end of file diff --git a/vitrivr-engine-server/build.gradle b/vitrivr-engine-server/build.gradle index 654a8ca1..637882c4 100644 --- a/vitrivr-engine-server/build.gradle +++ b/vitrivr-engine-server/build.gradle @@ -11,6 +11,7 @@ dependencies { api project(':vitrivr-engine-module-features') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */ api project(':vitrivr-engine-module-cottontaildb') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */ api project(':vitrivr-engine-module-pgvector') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */ + api project(':vitrivr-engine-module-jsonl') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */ api project(':vitrivr-engine-module-fes') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */ /** Clikt & JLine */ From 4d6e3ab20a89343678a1c547d5797fa8b6bcea11 Mon Sep 17 00:00:00 2001 From: Luca Rossetto Date: Sun, 20 Oct 2024 11:47:32 +0200 Subject: [PATCH 03/12] Initial version of FFmpegVideoDecoder without audio support --- .../engine/index/decode/FFmpegVideoDecoder.kt | 75 +++++++++---------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index 558baa31..e385fa85 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -2,6 +2,7 @@ package org.vitrivr.engine.index.decode import com.github.kokorin.jaffree.StreamType import com.github.kokorin.jaffree.ffmpeg.* +import com.github.kokorin.jaffree.ffprobe.FFprobe import io.github.oshai.kotlinlogging.KLogger import io.github.oshai.kotlinlogging.KotlinLogging import kotlinx.coroutines.CoroutineScope @@ -25,11 +26,11 @@ import org.vitrivr.engine.core.operators.ingest.Enumerator import org.vitrivr.engine.core.source.MediaType import org.vitrivr.engine.core.source.file.FileSource import java.awt.image.BufferedImage -import java.nio.ShortBuffer import java.nio.file.Path import java.util.* import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import org.vitrivr.engine.core.source.Metadata class FFmpegVideoDecoder : DecoderFactory { @@ -55,8 +56,8 @@ class FFmpegVideoDecoder : DecoderFactory { /** [KLogger] instance. */ private val logger: KLogger = KotlinLogging.logger {} -// private val ffprobe: FFprobe -// get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath() + private val ffprobe: FFprobe + get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath() private val ffmpeg: FFmpeg get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath() @@ -70,6 +71,31 @@ class FFmpegVideoDecoder : DecoderFactory { return@collect } + val probeResult = ffprobe.setShowStreams(true).also { + if (source is FileSource) { + it.setInput(source.path) + } else { + it.setInput(source.newInputStream()) + } + }.execute() + + val videoStreamInfo = probeResult.streams.find { it.codecType == StreamType.VIDEO } + + if (videoStreamInfo != null) { + source.metadata[Metadata.METADATA_KEY_VIDEO_FPS] = videoStreamInfo.avgFrameRate.toDouble() + source.metadata[Metadata.METADATA_KEY_AV_DURATION] = (videoStreamInfo.duration * 1000f).toLong() + source.metadata[Metadata.METADATA_KEY_IMAGE_WIDTH] = videoStreamInfo.width + source.metadata[Metadata.METADATA_KEY_IMAGE_HEIGHT] = videoStreamInfo.height + } + + val audioStreamInfo = probeResult.streams.find { it.codecType == StreamType.AUDIO } + + if (audioStreamInfo != null) { + source.metadata[Metadata.METADATA_KEY_AUDIO_CHANNELS] = audioStreamInfo.channels + source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLERATE] = audioStreamInfo.sampleRate + source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLESIZE] = audioStreamInfo.sampleFmt + } + var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) val imageTransferBuffer = LinkedBlockingQueue>(10) @@ -111,7 +137,10 @@ class FFmpegVideoDecoder : DecoderFactory { } ) - ).setFilter(StreamType.VIDEO, "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease") + ).setFilter( + StreamType.VIDEO, + "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease" + ) //TODO audio settings @@ -121,7 +150,7 @@ class FFmpegVideoDecoder : DecoderFactory { while (!(future.isDone || future.isCancelled) || imageTransferBuffer.isNotEmpty()) { - val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?:continue + val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?: continue localImageBuffer.add(next) if (localImageBuffer.last().second >= windowEnd) { @@ -146,24 +175,20 @@ class FFmpegVideoDecoder : DecoderFactory { * Emits a single [Retrievable] to the downstream [channel]. * * @param imageBuffer A [LinkedList] containing [BufferedImage] elements to emit (frames). - * @param audioBuffer The [LinkedList] containing the [ShortBuffer] elements to emit (audio samples). * @param grabber The [FrameGrabber] instance. * @param timestampEnd The end timestamp. * @param source The source [Retrievable] the emitted [Retrievable] is part of. */ private suspend fun emit( imageBuffer: MutableList>, - //audioBuffer: LinkedList>, timestampEnd: Long, source: Retrievable, channel: ProducerScope ) { - /* Audio samples. */ - //var audioSize = 0 + val emitImage = mutableListOf() - //val emitAudio = mutableListOf() - /* Drain buffers. */ + /* Drain buffer. */ imageBuffer.removeIf { if (it.second <= timestampEnd) { emitImage.add(it.first) @@ -173,15 +198,6 @@ class FFmpegVideoDecoder : DecoderFactory { } } -// audioBuffer.removeIf { -// if (it.second <= timestampEnd) { -// audioSize += it.first.limit() -// emitAudio.add(it.first) -// true -// } else { -// false -// } -// } /* Prepare ingested with relationship to source. */ val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false) @@ -195,23 +211,6 @@ class FFmpegVideoDecoder : DecoderFactory { ) ) -// /* Prepare and append audio content element. */ -// if (emitAudio.size > 0) { -// val samples = ShortBuffer.allocate(audioSize) -// for (frame in emitAudio) { -// frame.clear() -// samples.put(frame) -// } -// samples.clear() -// val audio = this.context.contentFactory.newAudioContent( -// grabber.audioChannels.toShort(), -// grabber.sampleRate, -// samples -// ) -// ingested.addContent(audio) -// ingested.addAttribute(ContentAuthorAttribute(audio.id, name)) -// } - /* Prepare and append image content element. */ for (image in emitImage) { val imageContent = this.context.contentFactory.newImageContent(image) @@ -219,7 +218,7 @@ class FFmpegVideoDecoder : DecoderFactory { ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name)) } - //logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images and ${emitAudio.size} audio samples: ${ingested.id}" } + logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images: ${ingested.id}" } /* Emit ingested. */ channel.send(ingested) From b74c7e9c42a0edb6cb09ec3d3ef28249300942ae Mon Sep 17 00:00:00 2001 From: Luca Rossetto Date: Sun, 20 Oct 2024 12:02:05 +0200 Subject: [PATCH 04/12] Added error tolerance and logging to ThumbnailExporter --- .../index/exporters/ThumbnailExporter.kt | 53 +++++++++++-------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt index 03d959bd..f82818d3 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt @@ -58,38 +58,47 @@ class ThumbnailExporter : ExporterFactory { require(mimeType in SUPPORTED) { "ThumbnailExporter only support image formats JPEG and PNG." } } + /** [KLogger] instance. */ + private val logger: KLogger = KotlinLogging.logger {} override fun toFlow(scope: CoroutineScope): Flow = this.input.toFlow(scope).onEach { retrievable -> - val resolvable = this.context.resolver.resolve(retrievable.id) + try { - val contentIds = this.contentSources?.let { - retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it) - } + val resolvable = this.context.resolver.resolve(retrievable.id) - val content = retrievable.content.filterIsInstance().filter { contentIds?.contains(it.id) ?: true } - if (resolvable != null && content.isNotEmpty()) { - val writer = when (mimeType) { - MimeType.JPEG, - MimeType.JPG -> JpegWriter() - MimeType.PNG -> PngWriter() - else -> throw IllegalArgumentException("Unsupported mime type $mimeType") + val contentIds = this.contentSources?.let { + retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it) } - logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." } + val content = + retrievable.content.filterIsInstance().filter { contentIds?.contains(it.id) ?: true } + if (resolvable != null && content.isNotEmpty()) { + val writer = when (mimeType) { + MimeType.JPEG, + MimeType.JPG -> JpegWriter() - content.forEach { cnt -> - val imgBytes = ImmutableImage.fromAwt(cnt.content).let { - if (it.width > it.height) { - it.scaleToWidth(maxResolution) - } else { - it.scaleToHeight(maxResolution) - } - }.bytes(writer) + MimeType.PNG -> PngWriter() + else -> throw IllegalArgumentException("Unsupported mime type $mimeType") + } - resolvable.openOutputStream().use { - it.write(imgBytes) + logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." } + + content.forEach { cnt -> + val imgBytes = ImmutableImage.fromAwt(cnt.content).let { + if (it.width > it.height) { + it.scaleToWidth(maxResolution) + } else { + it.scaleToHeight(maxResolution) + } + }.bytes(writer) + + resolvable.openOutputStream().use { + it.write(imgBytes) + } } } + } catch (e: Exception) { + logger.error(e){"Error during thumbnail creation"} } } } From 33e3388b1020646b761c8eed08e8af2c064448ff Mon Sep 17 00:00:00 2001 From: Luca Rossetto Date: Mon, 21 Oct 2024 09:57:55 +0200 Subject: [PATCH 05/12] Added 'framerate' parameter to downsample during decoding --- .../org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index e385fa85..8f5182db 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -37,10 +37,11 @@ class FFmpegVideoDecoder : DecoderFactory { override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder { val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840 val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160 + val framerate = context[name, "framerate"]?.toIntOrNull() val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) } - return Instance(input, context, timeWindowMs, maxWidth, maxHeight, name, ffmpegPath) + return Instance(input, context, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath) } private class Instance( @@ -49,6 +50,7 @@ class FFmpegVideoDecoder : DecoderFactory { private val timeWindowMs: Long = 500L, private val maxWidth: Int, private val maxHeight: Int, + private val framerate: Int?, private val name: String, private val ffmpegPath: Path? ) : Decoder { @@ -139,7 +141,7 @@ class FFmpegVideoDecoder : DecoderFactory { ) ).setFilter( StreamType.VIDEO, - "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease" + "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease${if (framerate != null && framerate > 0) ",fps=$framerate" else ""}'" ) //TODO audio settings From e94e7f457658544477bd330b9cb5ef4eebefc926 Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Tue, 5 Nov 2024 18:22:16 +0100 Subject: [PATCH 06/12] Addresses some of the review comments made by me. --- .../engine/index/decode/FFmpegVideoDecoder.kt | 270 +++++++++++------- 1 file changed, 170 insertions(+), 100 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index 8f5182db..b5f8bbcb 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -12,8 +12,10 @@ import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.channelFlow -import org.bytedeco.javacv.FrameGrabber +import kotlinx.coroutines.runBlocking import org.vitrivr.engine.core.context.IndexContext +import org.vitrivr.engine.core.model.content.element.AudioContent +import org.vitrivr.engine.core.model.content.element.ImageContent import org.vitrivr.engine.core.model.relationship.Relationship import org.vitrivr.engine.core.model.retrievable.Ingested import org.vitrivr.engine.core.model.retrievable.Retrievable @@ -24,29 +26,43 @@ import org.vitrivr.engine.core.operators.ingest.Decoder import org.vitrivr.engine.core.operators.ingest.DecoderFactory import org.vitrivr.engine.core.operators.ingest.Enumerator import org.vitrivr.engine.core.source.MediaType +import org.vitrivr.engine.core.source.Metadata +import org.vitrivr.engine.core.source.Source import org.vitrivr.engine.core.source.file.FileSource import java.awt.image.BufferedImage +import java.nio.ShortBuffer import java.nio.file.Path import java.util.* -import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit -import org.vitrivr.engine.core.source.Metadata +/** + * A [Decoder] that can decode [ImageContent] and [AudioContent] from a [Source] of [MediaType.VIDEO]. + * + * Based on Jaffree FFmpeg wrapper, which spawns a new FFmpeg process for each [Source]. + * + * @author Luca Rossetto + * @author Ralph Gasser + * @version 1.0.0 + */ class FFmpegVideoDecoder : DecoderFactory { override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder { val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840 val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160 val framerate = context[name, "framerate"]?.toIntOrNull() + val video = context[name, "video"]?.let { it.lowercase() == "true" } ?: true + val audio = context[name, "audio"]?.let { it.lowercase() == "true" } ?: true val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) } - return Instance(input, context, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath) + return Instance(input, context, video, audio, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath) } private class Instance( override val input: Enumerator, private val context: IndexContext, + private val video: Boolean = true, + private val audio: Boolean = true, private val timeWindowMs: Long = 500L, private val maxWidth: Int, private val maxHeight: Int, @@ -81,8 +97,8 @@ class FFmpegVideoDecoder : DecoderFactory { } }.execute() + /* Extract metadata. */ val videoStreamInfo = probeResult.streams.find { it.codecType == StreamType.VIDEO } - if (videoStreamInfo != null) { source.metadata[Metadata.METADATA_KEY_VIDEO_FPS] = videoStreamInfo.avgFrameRate.toDouble() source.metadata[Metadata.METADATA_KEY_AV_DURATION] = (videoStreamInfo.duration * 1000f).toLong() @@ -91,140 +107,194 @@ class FFmpegVideoDecoder : DecoderFactory { } val audioStreamInfo = probeResult.streams.find { it.codecType == StreamType.AUDIO } - if (audioStreamInfo != null) { source.metadata[Metadata.METADATA_KEY_AUDIO_CHANNELS] = audioStreamInfo.channels source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLERATE] = audioStreamInfo.sampleRate source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLESIZE] = audioStreamInfo.sampleFmt } - var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) - - val imageTransferBuffer = LinkedBlockingQueue>(10) - - val ffmpegInstance = ffmpeg.addInput( + /* Create consumer. */ + val consumer = InFlowFrameConsumer(this, sourceRetrievable) + val ffmpegInstance = this@Instance.ffmpeg.addInput( if (source is FileSource) { UrlInput.fromPath(source.path) } else { PipeInput.pumpFrom(source.newInputStream()) } - ).addOutput( - FrameOutput.withConsumerAlpha( - object : FrameConsumer { + ).addOutput(FrameOutput.withConsumerAlpha(consumer)) - val streamMap = mutableMapOf() + /* Execute. */ + ffmpegInstance.execute() - override fun consumeStreams(streams: MutableList) { - streams.forEach { stream -> streamMap[stream.id] = stream } - } + /* Emit final frames. */ + if (!consumer.isEmpty()) { + consumer.emit() + } - override fun consume(frame: Frame) { + /* Emit source retrievable. */ + send(sourceRetrievable) + } + }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) - val stream = streamMap[frame.streamId] ?: return - when (stream.type) { - Stream.Type.VIDEO -> { - imageTransferBuffer.put(frame.image!! to (1000000 * frame.pts) / stream.timebase) - } + /** + * A [FrameConsumer] that emits [Retrievable]s to the downstream [channel]. + */ + private inner class InFlowFrameConsumer(private val channel: ProducerScope, val source: Retrievable) : FrameConsumer { - Stream.Type.AUDIO -> { - //TODO - } + /** The video [Stream] processed by this [InFlowFrameConsumer]. */ + var videoStream: Stream? = null + private set - null -> { - /* ignore */ - } - } - } + /** The audio [Stream] processed by this [InFlowFrameConsumer]. */ + var audioStream: Stream? = null + private set - } - ) - ).setFilter( - StreamType.VIDEO, - "scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease${if (framerate != null && framerate > 0) ",fps=$framerate" else ""}'" - ) + /** The end of the time window. */ + var windowEnd = TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) + private set - //TODO audio settings + /** Flag indicating, that video is ready to be emitted. */ + var videoReady = false - val future = ffmpegInstance.executeAsync() + /** Flag indicating, that audio is ready to be emitted. */ + var audioReady = false - val localImageBuffer = LinkedList>() + /** [List] of grabbed [BufferedImage]s. */ + val imageBuffer: List> = LinkedList() - while (!(future.isDone || future.isCancelled) || imageTransferBuffer.isNotEmpty()) { + /** [List] of grabbed [ShortBuffer]s. */ + val audioBuffer: List> = LinkedList() - val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?: continue - localImageBuffer.add(next) - if (localImageBuffer.last().second >= windowEnd) { - emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow) - windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) - } + /** + * Returns true if both the image and audio buffer are empty. + */ + fun isEmpty(): Boolean = this.imageBuffer.isEmpty() && this.audioBuffer.isEmpty() - } + /** + * Initializes this [InFlowFrameConsumer]. + * + * @param streams List of [Stream]s to initialize the [InFlowFrameConsumer] with. + */ + override fun consumeStreams(streams: MutableList) { + this.videoStream = streams.firstOrNull { it.type == Stream.Type.VIDEO } + this.audioStream = streams.firstOrNull { it.type == Stream.Type.AUDIO } + } - while (localImageBuffer.isNotEmpty()) { - emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow) - windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) + /** + * Consumes a single [Frame]. + * + * @param frame [Frame] to consume. + */ + override fun consume(frame: Frame) = runBlocking { + val stream = if (frame.streamId == this@InFlowFrameConsumer.audioStream?.id) { + this@InFlowFrameConsumer.audioStream!! + } else if (frame.streamId == this@InFlowFrameConsumer.videoStream?.id) { + this@InFlowFrameConsumer.videoStream!! + } else { + return@runBlocking + } + val timestamp = ((1000000 * frame.pts) / stream.timebase) + when (stream.type) { + Stream.Type.VIDEO -> { + (this@InFlowFrameConsumer.imageBuffer as LinkedList).add(frame.image!! to timestamp) + if (timestamp >= this@InFlowFrameConsumer.windowEnd) { + this@InFlowFrameConsumer.videoReady = true + } + } + Stream.Type.AUDIO -> { + val samples = ShortBuffer.wrap(frame.samples.map { it.toShort() }.toShortArray()) + (this@InFlowFrameConsumer.audioBuffer as LinkedList).add(samples to timestamp) + if (timestamp >= this@InFlowFrameConsumer.windowEnd) { + this@InFlowFrameConsumer.audioReady = true + } + } + else -> {} } - send(sourceRetrievable) - - } - }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) + /* If enough frames have been collected, emit them. */ + if (this@InFlowFrameConsumer.videoReady && this@InFlowFrameConsumer.audioReady) { + emit() + /* Reset counters and flags. */ + this@InFlowFrameConsumer.videoReady = !(this@InFlowFrameConsumer.videoStream != null && this@Instance.video) + this@InFlowFrameConsumer.audioReady = !(this@InFlowFrameConsumer.audioStream != null && this@Instance.audio) - /** - * Emits a single [Retrievable] to the downstream [channel]. - * - * @param imageBuffer A [LinkedList] containing [BufferedImage] elements to emit (frames). - * @param grabber The [FrameGrabber] instance. - * @param timestampEnd The end timestamp. - * @param source The source [Retrievable] the emitted [Retrievable] is part of. - */ - private suspend fun emit( - imageBuffer: MutableList>, - timestampEnd: Long, - source: Retrievable, - channel: ProducerScope - ) { - - val emitImage = mutableListOf() - - /* Drain buffer. */ - imageBuffer.removeIf { - if (it.second <= timestampEnd) { - emitImage.add(it.first) - true - } else { - false + /* Update window end. */ + this@InFlowFrameConsumer.windowEnd += TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs) } } + /** + * Emits a single [Retrievable] to the downstream [channel]. + */ + suspend fun emit() { + /* Audio samples. */ + var audioSize = 0 + val emitImage = mutableListOf() + val emitAudio = mutableListOf() + + /* Drain buffers. */ + (this.imageBuffer as LinkedList).removeIf { + if (it.second <= this.windowEnd) { + emitImage.add(it.first) + true + } else { + false + } + } + (this.audioBuffer as LinkedList).removeIf { + if (it.second <= this.windowEnd) { + audioSize += it.first.limit() + emitAudio.add(it.first) + true + } else { + false + } + } - /* Prepare ingested with relationship to source. */ - val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false) - source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) } - ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false)) - ingested.addAttribute( - TimeRangeAttribute( - timestampEnd - TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs), - timestampEnd, - TimeUnit.MICROSECONDS + /* Prepare ingested with relationship to source. */ + val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false) + this.source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) } + ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false)) + ingested.addAttribute( + TimeRangeAttribute( + this.windowEnd - TimeUnit.MILLISECONDS.toMicros(this@Instance.timeWindowMs), + this.windowEnd, + TimeUnit.MICROSECONDS + ) ) - ) - /* Prepare and append image content element. */ - for (image in emitImage) { - val imageContent = this.context.contentFactory.newImageContent(image) - ingested.addContent(imageContent) - ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name)) - } + /* Prepare and append audio content element. */ + if (emitAudio.size > 0) { + val samples = ShortBuffer.allocate(audioSize) + for (frame in emitAudio) { + frame.clear() + samples.put(frame) + } + samples.clear() + val audio = this@Instance.context.contentFactory.newAudioContent( + this.audioStream!!.channels.toShort(), + this.audioStream!!.sampleRate.toInt(), + samples + ) + ingested.addContent(audio) + ingested.addAttribute(ContentAuthorAttribute(audio.id, name)) + } - logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images: ${ingested.id}" } + /* Prepare and append image content element. */ + for (image in emitImage) { + val imageContent = this@Instance.context.contentFactory.newImageContent(image) + ingested.addContent(imageContent) + ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name)) + } - /* Emit ingested. */ - channel.send(ingested) - } + logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images and ${emitAudio.size} audio samples: ${ingested.id}" } + /* Emit ingested. */ + this.channel.send(ingested) + } + } } } \ No newline at end of file From f09d39902a744ca24c3ac14cc97550bfe618360c Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Tue, 5 Nov 2024 20:41:04 +0100 Subject: [PATCH 07/12] Fixes an issue, since Jaffree seems to throw an exception once video has been finalised. --- .../engine/index/decode/FFmpegVideoDecoder.kt | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index b5f8bbcb..f58ccd5e 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -1,5 +1,6 @@ package org.vitrivr.engine.index.decode +import com.github.kokorin.jaffree.JaffreeException import com.github.kokorin.jaffree.StreamType import com.github.kokorin.jaffree.ffmpeg.* import com.github.kokorin.jaffree.ffprobe.FFprobe @@ -124,15 +125,19 @@ class FFmpegVideoDecoder : DecoderFactory { ).addOutput(FrameOutput.withConsumerAlpha(consumer)) /* Execute. */ - ffmpegInstance.execute() + try { + ffmpegInstance.execute() + } catch (e: JaffreeException) { + logger.warn(e) { "Error while decoding source ${source.name} (${source.sourceId})." } + } finally { + /* Emit final frames. */ + if (!consumer.isEmpty()) { + consumer.emit() + } - /* Emit final frames. */ - if (!consumer.isEmpty()) { - consumer.emit() + /* Emit source retrievable. */ + send(sourceRetrievable) } - - /* Emit source retrievable. */ - send(sourceRetrievable) } }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) @@ -166,7 +171,6 @@ class FFmpegVideoDecoder : DecoderFactory { /** [List] of grabbed [ShortBuffer]s. */ val audioBuffer: List> = LinkedList() - /** * Returns true if both the image and audio buffer are empty. */ From 8b1b659e292513017b9bfef441d09aa74f6825c5 Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Thu, 7 Nov 2024 16:24:09 +0100 Subject: [PATCH 08/12] Fixes NPE that would cause FFmpegVideoDecoder to run into exceptions. --- .../engine/index/decode/FFmpegVideoDecoder.kt | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index f58ccd5e..ac3d4433 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -1,6 +1,5 @@ package org.vitrivr.engine.index.decode -import com.github.kokorin.jaffree.JaffreeException import com.github.kokorin.jaffree.StreamType import com.github.kokorin.jaffree.ffmpeg.* import com.github.kokorin.jaffree.ffprobe.FFprobe @@ -116,20 +115,21 @@ class FFmpegVideoDecoder : DecoderFactory { /* Create consumer. */ val consumer = InFlowFrameConsumer(this, sourceRetrievable) - val ffmpegInstance = this@Instance.ffmpeg.addInput( - if (source is FileSource) { - UrlInput.fromPath(source.path) - } else { - PipeInput.pumpFrom(source.newInputStream()) - } - ).addOutput(FrameOutput.withConsumerAlpha(consumer)) + /* Execute. */ try { - ffmpegInstance.execute() - } catch (e: JaffreeException) { - logger.warn(e) { "Error while decoding source ${source.name} (${source.sourceId})." } - } finally { + source.newInputStream().use { + var output = FrameOutput.withConsumerAlpha(consumer).disableStream(StreamType.SUBTITLE).disableStream(StreamType.DATA) + if (!this@Instance.video) { + output = output.disableStream(StreamType.VIDEO) + } + if (!this@Instance.audio) { + output = output.disableStream(StreamType.AUDIO) + } + this@Instance.ffmpeg.addInput(PipeInput.pumpFrom(it)).addOutput(output).execute() + } + /* Emit final frames. */ if (!consumer.isEmpty()) { consumer.emit() @@ -137,6 +137,8 @@ class FFmpegVideoDecoder : DecoderFactory { /* Emit source retrievable. */ send(sourceRetrievable) + } catch (e: Throwable) { + logger.error(e) { "Error while decoding source ${source.name} (${source.sourceId})." } } } }.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND) @@ -191,13 +193,12 @@ class FFmpegVideoDecoder : DecoderFactory { * * @param frame [Frame] to consume. */ - override fun consume(frame: Frame) = runBlocking { - val stream = if (frame.streamId == this@InFlowFrameConsumer.audioStream?.id) { - this@InFlowFrameConsumer.audioStream!! - } else if (frame.streamId == this@InFlowFrameConsumer.videoStream?.id) { - this@InFlowFrameConsumer.videoStream!! - } else { - return@runBlocking + override fun consume(frame: Frame?) = runBlocking { + if (frame == null) return@runBlocking + val stream = when (frame.streamId) { + this@InFlowFrameConsumer.audioStream?.id -> this@InFlowFrameConsumer.audioStream!! + this@InFlowFrameConsumer.videoStream?.id -> this@InFlowFrameConsumer.videoStream!! + else -> return@runBlocking } val timestamp = ((1000000 * frame.pts) / stream.timebase) when (stream.type) { From 2d8480a563046a91c20094cb94e88b112f424fa7 Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Thu, 7 Nov 2024 16:37:30 +0100 Subject: [PATCH 09/12] A single DiskResolver is now capable of creating output of different types (e.g., thumbnail and audio clip). --- .../vitrivr/engine/core/resolver/Resolver.kt | 3 +- .../engine/core/resolver/impl/DiskResolver.kt | 20 +++++----- .../engine/core/source/file/MimeType.kt | 8 +--- .../index/exporters/ThumbnailExporter.kt | 2 +- .../index/exporters/VideoPreviewExporter.kt | 3 +- .../engine/model3d/ModelPreviewExporter.kt | 39 +++++++++++-------- .../api/rest/handlers/FetchExportData.kt | 2 +- 7 files changed, 38 insertions(+), 39 deletions(-) diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/Resolver.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/Resolver.kt index c0bb5004..3150f493 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/Resolver.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/Resolver.kt @@ -13,8 +13,9 @@ interface Resolver { * Attempts to resolve the provided [RetrievableId] to a [Resolvable] using this [Resolver]. * * @param id The [RetrievableId] to resolve. + * @param suffix The suffix of the filename. * @return [Resolvable] or null, if [RetrievableId] could not be resolved. */ - fun resolve(id: RetrievableId) : Resolvable? + fun resolve(id: RetrievableId, suffix: String) : Resolvable? } \ No newline at end of file diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/impl/DiskResolver.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/impl/DiskResolver.kt index 4343c9e7..f5a6a527 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/impl/DiskResolver.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/resolver/impl/DiskResolver.kt @@ -28,14 +28,13 @@ class DiskResolver : ResolverFactory { */ override fun newResolver(schema: Schema, parameters: Map): Resolver { val location = Paths.get(parameters["location"] ?: "./thumbnails/${schema.name}") - val mimeType = MimeType.valueOf(parameters["mimeType"] ?: "JPG") - return Instance(location, mimeType) + return Instance(location) } /** * The [Resolver] generated by this [DiskResolver]. */ - private class Instance(private val location: Path, private val mimeType: MimeType) : Resolver { + private class Instance(private val location: Path) : Resolver { init { /* Make sure, directory exists. */ if (!Files.exists(this.location)) { @@ -47,20 +46,19 @@ class DiskResolver : ResolverFactory { * Resolves the provided [RetrievableId] to a [Resolvable] using this [Resolver]. * * @param id The [RetrievableId] to resolve. + * @param suffix The suffix of the filename. * @return [Resolvable] or null, if [RetrievableId] could not be resolved. */ - override fun resolve(id: RetrievableId): Resolvable = DiskResolvable(id) - + override fun resolve(id: RetrievableId, suffix: String): Resolvable = DiskResolvable(id, suffix) /** * A [Resolvable] generated by this [DiskResolver]. */ - inner class DiskResolvable(override val retrievableId: RetrievableId) : Resolvable { - val path: Path - get() = this@Instance.location.resolve("$retrievableId.${this@Instance.mimeType.fileExtension}") - override val mimeType: MimeType - get() = this@Instance.mimeType - + inner class DiskResolvable(override val retrievableId: RetrievableId, suffix: String) : Resolvable { + val path: Path = this@Instance.location.resolve("${retrievableId}.$suffix") + override val mimeType: MimeType by lazy { + MimeType.getMimeType(this.path) ?: MimeType.UNKNOWN + } override fun exists(): Boolean = Files.exists(this.path) override fun openInputStream(): InputStream = Files.newInputStream(this.path, StandardOpenOption.READ) override fun openOutputStream(): OutputStream = Files.newOutputStream(this.path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE) diff --git a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/source/file/MimeType.kt b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/source/file/MimeType.kt index 5669a31b..ece74d09 100644 --- a/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/source/file/MimeType.kt +++ b/vitrivr-engine-core/src/main/kotlin/org/vitrivr/engine/core/source/file/MimeType.kt @@ -59,12 +59,8 @@ enum class MimeType(val fileExtension: String, val mimeType: String, val mediaTy OFF("off", "application/3d-off", MediaType.MESH), GLTF("gltf", "model/gltf+json", MediaType.MESH), - - - //Unknown type - UNKNOWN("", "", MediaType.NONE) - ; + UNKNOWN("", "", MediaType.NONE); companion object { fun getMimeType(fileName: String): MimeType? = try { @@ -80,6 +76,6 @@ enum class MimeType(val fileExtension: String, val mimeType: String, val mediaTy null } - val allValid = MimeType.values().filter { it != UNKNOWN }.toSet() + val allValid = entries.filter { it != UNKNOWN }.toSet() } } \ No newline at end of file diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt index f82818d3..591522e0 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/ThumbnailExporter.kt @@ -64,7 +64,7 @@ class ThumbnailExporter : ExporterFactory { override fun toFlow(scope: CoroutineScope): Flow = this.input.toFlow(scope).onEach { retrievable -> try { - val resolvable = this.context.resolver.resolve(retrievable.id) + val resolvable = this.context.resolver.resolve(retrievable.id, ".${this.mimeType.fileExtension}") val contentIds = this.contentSources?.let { retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/VideoPreviewExporter.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/VideoPreviewExporter.kt index ee80caec..faa993fc 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/VideoPreviewExporter.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/VideoPreviewExporter.kt @@ -11,7 +11,6 @@ import kotlinx.coroutines.flow.onEach import org.bytedeco.javacpp.PointerScope import org.bytedeco.javacv.FFmpegFrameGrabber import org.bytedeco.javacv.Java2DFrameConverter -import org.bytedeco.javacv.Java2DFrameUtils import org.vitrivr.engine.core.context.IndexContext import org.vitrivr.engine.core.model.retrievable.Retrievable import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute @@ -78,7 +77,7 @@ class VideoPreviewExporter : ExporterFactory { override fun toFlow(scope: CoroutineScope): Flow = this.input.toFlow(scope).onEach { retrievable -> val source = retrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@onEach if (source.type == MediaType.VIDEO) { - val resolvable = this.context.resolver.resolve(retrievable.id) + val resolvable = this.context.resolver.resolve(retrievable.id, ".${this.mimeType.fileExtension}") if (resolvable != null) { val writer = when (mimeType) { MimeType.JPEG, diff --git a/vitrivr-engine-module-m3d/src/main/kotlin/org/vitrivr/engine/model3d/ModelPreviewExporter.kt b/vitrivr-engine-module-m3d/src/main/kotlin/org/vitrivr/engine/model3d/ModelPreviewExporter.kt index 237ebca5..a3cbd207 100644 --- a/vitrivr-engine-module-m3d/src/main/kotlin/org/vitrivr/engine/model3d/ModelPreviewExporter.kt +++ b/vitrivr-engine-module-m3d/src/main/kotlin/org/vitrivr/engine/model3d/ModelPreviewExporter.kt @@ -12,17 +12,17 @@ import org.vitrivr.engine.core.model.mesh.texturemodel.Model3d import org.vitrivr.engine.core.model.mesh.texturemodel.util.entropyoptimizer.EntopyCalculationMethod import org.vitrivr.engine.core.model.mesh.texturemodel.util.entropyoptimizer.EntropyOptimizerStrategy import org.vitrivr.engine.core.model.mesh.texturemodel.util.entropyoptimizer.OptimizerOptions +import org.vitrivr.engine.core.model.mesh.texturemodel.util.types.Vec3f import org.vitrivr.engine.core.model.retrievable.Retrievable import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute -import org.vitrivr.engine.core.model.mesh.texturemodel.util.types.Vec3f import org.vitrivr.engine.core.operators.Operator import org.vitrivr.engine.core.operators.general.Exporter import org.vitrivr.engine.core.operators.general.ExporterFactory import org.vitrivr.engine.core.source.MediaType import org.vitrivr.engine.core.source.file.MimeType import org.vitrivr.engine.model3d.lwjglrender.render.RenderOptions -import org.vitrivr.engine.model3d.lwjglrender.window.WindowOptions import org.vitrivr.engine.model3d.lwjglrender.util.texturemodel.entroopyoptimizer.ModelEntropyOptimizer +import org.vitrivr.engine.model3d.lwjglrender.window.WindowOptions import org.vitrivr.engine.model3d.renderer.ExternalRenderer import java.awt.image.BufferedImage import java.io.ByteArrayOutputStream @@ -45,10 +45,10 @@ private val logger: KLogger = KotlinLogging.logger {} */ class ModelPreviewExporter : ExporterFactory { companion object { - val SUPPORTED = setOf(MimeType.GLTF) + val SUPPORTED_INPUT = setOf(MimeType.GLTF) /** Set of supported output formats. */ - val OUTPUT_FORMAT = setOf("gif", "jpg") + val SUPPORTED_OUTPUT = setOf(MimeType.GIF, MimeType.JPG, MimeType.JPEG) /** * Renders a preview of the given model as a JPEG image. @@ -186,7 +186,7 @@ class ModelPreviewExporter : ExporterFactory { } } ?: MimeType.GLTF val distance = context[name, "distance"]?.toFloatOrNull() ?: 1f - val format = context[name, "format"] ?: "gif" + val format = MimeType.valueOf(context[name, "format"]?.uppercase() ?: "GIF") val views = context[name, "views"]?.toIntOrNull() ?: 30 logger.debug { @@ -202,12 +202,12 @@ class ModelPreviewExporter : ExporterFactory { private val maxResolution: Int, mimeType: MimeType, private val distance: Float, - private val format: String, + private val format: MimeType, private val views: Int ) : Exporter { init { - require(mimeType in SUPPORTED) { "ModelPreviewExporter only supports models of format GLTF." } - require(format in OUTPUT_FORMAT) { "ModelPreviewExporter only supports exporting a gif of jpg." } + require(mimeType in SUPPORTED_INPUT) { "ModelPreviewExporter only supports models of format GLTF." } + require(this.format in SUPPORTED_OUTPUT) { "ModelPreviewExporter only supports exporting a gif of jpg." } } override fun toFlow(scope: CoroutineScope): Flow { @@ -216,7 +216,7 @@ class ModelPreviewExporter : ExporterFactory { val source = retrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@onEach if (source.type == MediaType.MESH) { - val resolvable = this.context.resolver.resolve(retrievable.id) + val resolvable = this.context.resolver.resolve(retrievable.id, ".${this.format.fileExtension}") val model = retrievable.content[0].content as Model3d if (resolvable != null) { @@ -225,15 +225,20 @@ class ModelPreviewExporter : ExporterFactory { } source.newInputStream().use { input -> - if (format == "jpg") { - val preview: BufferedImage = renderPreviewJPEG(model, renderer, this.distance) - resolvable.openOutputStream().use { output -> - ImageIO.write(preview, "jpg", output) + when (format) { + MimeType.JPG, + MimeType.JPEG -> { + val preview: BufferedImage = renderPreviewJPEG(model, renderer, this.distance) + resolvable.openOutputStream().use { output -> + ImageIO.write(preview, "jpg", output) + } + } + MimeType.GIF -> { + val frames = createFramesForGif(model, renderer, this.views, this.distance) + val gif = createGif(frames, 50) + resolvable.openOutputStream().use { output -> output.write(gif!!.toByteArray()) } } - } else { // format == "gif" - val frames = createFramesForGif(model, renderer, this.views, this.distance) - val gif = createGif(frames, 50) - resolvable.openOutputStream().use { output -> output.write(gif!!.toByteArray()) } + else -> throw IllegalArgumentException("Unsupported mime type $format") } } } diff --git a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/FetchExportData.kt b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/FetchExportData.kt index 1f070dfb..c44de7ab 100644 --- a/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/FetchExportData.kt +++ b/vitrivr-engine-server/src/main/kotlin/org/vitrivr/engine/server/api/rest/handlers/FetchExportData.kt @@ -37,7 +37,7 @@ fun fetchExportData(ctx: Context, schema: Schema) { } /* Try to resolve resolvable for retrievable ID. */ - val resolvable = schema.getExporter(exporterName)?.resolver?.resolve(retrievableId) + val resolvable = schema.getExporter(exporterName)?.resolver?.resolve(retrievableId, ".jpg") if (resolvable == null) { ctx.status(404) ctx.json(ErrorStatus("Failed to resolve data.")) From 896497feec0ff8dd1a8a3d831057498983cb25c7 Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Fri, 8 Nov 2024 06:57:33 +0100 Subject: [PATCH 10/12] Adds a simple exporter for Wave files (for debugging purposes). --- .../engine/index/exporters/WaveExporter.kt | 60 +++++++++++++++++++ .../engine/index/util/WaveUtilities.kt | 58 ++++++++++++------ ...ine.core.operators.general.ExporterFactory | 1 + 3 files changed, 101 insertions(+), 18 deletions(-) create mode 100644 vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/WaveExporter.kt diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/WaveExporter.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/WaveExporter.kt new file mode 100644 index 00000000..f64d4404 --- /dev/null +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/exporters/WaveExporter.kt @@ -0,0 +1,60 @@ +package org.vitrivr.engine.index.exporters + +import io.github.oshai.kotlinlogging.KLogger +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.onEach +import org.vitrivr.engine.core.context.IndexContext +import org.vitrivr.engine.core.model.content.element.AudioContent +import org.vitrivr.engine.core.model.retrievable.Retrievable +import org.vitrivr.engine.core.model.retrievable.attributes.ContentAuthorAttribute +import org.vitrivr.engine.core.operators.Operator +import org.vitrivr.engine.core.operators.general.Exporter +import org.vitrivr.engine.core.operators.general.ExporterFactory +import org.vitrivr.engine.index.util.WaveUtilities +import java.io.IOException + +private val logger: KLogger = KotlinLogging.logger {} + +/** + * An [Exporter] that generates wave files from audio samples. + * + * @author Ralph Gasser + * @version 1.0.0 + */ +class WaveExporter: ExporterFactory { + /** + * Creates a new [Exporter] instance from this [ThumbnailExporter]. + * + * @param name The name of the [Exporter] + * @param input The [Operator] to acting as an input. + * @param context The [IndexContext] to use. + */ + override fun newExporter(name: String, input: Operator, context: IndexContext): Exporter { + return Instance(input, context, context[name, "contentSources"]?.split(",")?.toSet() ) + } + + /** + * The [Exporter] generated by this [WaveExporter]. + */ + private class Instance(override val input: Operator, private val context: IndexContext, private val contentSources:Set?) : Exporter { + + override fun toFlow(scope: CoroutineScope): Flow = this.input.toFlow(scope).onEach { retrievable -> + try { + val resolvable = this.context.resolver.resolve(retrievable.id, ".wav") + val contentIds = this.contentSources?.let { + retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it) + } + val content = retrievable.content.filterIsInstance().filter { contentIds?.contains(it.id) ?: true } + if (resolvable != null && content.isNotEmpty()) { + resolvable.openOutputStream().use { + WaveUtilities.export(content, it) + } + } + } catch (e: IOException) { + logger.error(e){ "IO exception during wave creation." } + } + } + } +} \ No newline at end of file diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/util/WaveUtilities.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/util/WaveUtilities.kt index bd69083c..31878d53 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/util/WaveUtilities.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/util/WaveUtilities.kt @@ -1,9 +1,10 @@ package org.vitrivr.engine.index.util import org.vitrivr.engine.core.model.content.element.AudioContent +import java.io.ByteArrayOutputStream +import java.io.OutputStream import java.nio.ByteBuffer import java.nio.ByteOrder -import java.nio.channels.ByteChannel import java.nio.file.Files import java.nio.file.Path import java.nio.file.StandardOpenOption @@ -12,7 +13,7 @@ import java.nio.file.StandardOpenOption * A collection of utilities for handling WAVE files. * * @author Ralph Gasser - * @version 1.0.0 + * @version 1.1.0 */ object WaveUtilities { /** @@ -22,22 +23,36 @@ object WaveUtilities { * @param path The path to the file. * */ - fun export(content: List, path: Path) { + fun export(content: List, path: Path) = Files.newOutputStream(path, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING).use { + export(content, it) + } + + /** + * Exports a list of [AudioContent] as WAVE file (.wav). + * + * @param content List of [AudioContent] to export. + * @param stream The [OutputStream] to write to. + * + */ + fun export(content: List, stream: OutputStream) { if (content.isEmpty()) return - Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW).use { channel -> - /* Write audio data. */ - var bytes = 0 - channel.position(44) - content.forEach { audio -> - val buffer = ByteBuffer.allocate(audio.size).order(ByteOrder.LITTLE_ENDIAN) - buffer.asShortBuffer().put(audio.content) - bytes += channel.write(buffer) - } - /* Write header. */ - channel.position(0) - writeWaveHeader(channel, content.first().channels, content.first().samplingRate, bytes) + /* Write samples. */ + val samples = ByteArrayOutputStream() + var bytes = 0 + content.forEach { audio -> + val buffer = ByteBuffer.allocate(audio.size).order(ByteOrder.LITTLE_ENDIAN) + buffer.asShortBuffer().put(audio.content) + samples.write(buffer.array()) + bytes += buffer.array().size } + + /* Write header. */ + val header = ByteBuffer.allocate(44).order(ByteOrder.LITTLE_ENDIAN) + writeWaveHeader(header, content.first().channels, content.first().samplingRate, bytes) + + stream.write(header.array()) + samples.writeTo(stream) } /** @@ -48,17 +63,25 @@ object WaveUtilities { */ fun export(content: AudioContent, path: Path) = export(listOf(content), path) + /** + * Exports a single [AudioContent] as WAVE file (.wav). + * + * @param content [AudioContent] to export. + * @param stream The [OutputStream] to export to + */ + fun export(content: AudioContent, stream: OutputStream) = export(listOf(content), stream) + /** * Writes the WAV header to the ByteBuffer. * + * @param buffer The ByteBuffer to write the header to. * @param channels The number of channels in the WAV file. * @param sampleRate Sample rate of the output file. * @param length Length in bytes of the frames data */ - private fun writeWaveHeader(channel: ByteChannel, channels: Short, sampleRate: Int, length: Int) { + private fun writeWaveHeader(buffer: ByteBuffer, channels: Short, sampleRate: Int, length: Int) { /* Length of the subChunk2. */ val subChunk2Length: Int = length * channels * Short.SIZE_BYTES /* Number of bytes for audio data: NumSamples * NumChannels * BytesPerSample. */ - val buffer = ByteBuffer.allocate(44).order(ByteOrder.LITTLE_ENDIAN) /* RIFF Chunk. */ buffer.put("RIFF".toByteArray(Charsets.US_ASCII)) @@ -78,6 +101,5 @@ object WaveUtilities { /* Data chunk */ buffer.put("data".toByteArray(Charsets.US_ASCII)) /* Begin of the data chunk. */ buffer.putInt(subChunk2Length) /* Length of the data chunk. */ - channel.write(buffer.flip()) } } \ No newline at end of file diff --git a/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.general.ExporterFactory b/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.general.ExporterFactory index 0420c873..857aad4b 100644 --- a/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.general.ExporterFactory +++ b/vitrivr-engine-index/src/main/resources/META-INF/services/org.vitrivr.engine.core.operators.general.ExporterFactory @@ -1,2 +1,3 @@ org.vitrivr.engine.index.exporters.ThumbnailExporter +org.vitrivr.engine.index.exporters.WaveExporter org.vitrivr.engine.index.exporters.VideoPreviewExporter \ No newline at end of file From 89c3a99564d87c6005ae153f06dd1585128e9d3b Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Fri, 8 Nov 2024 12:40:47 +0100 Subject: [PATCH 11/12] Fixes audio extraction. Thanks to @lucaro. --- .../org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index ac3d4433..e2ea3376 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -209,7 +209,7 @@ class FFmpegVideoDecoder : DecoderFactory { } } Stream.Type.AUDIO -> { - val samples = ShortBuffer.wrap(frame.samples.map { it.toShort() }.toShortArray()) + val samples = ShortBuffer.wrap(frame.samples.map { (it shr 16).toShort() }.toShortArray()) (this@InFlowFrameConsumer.audioBuffer as LinkedList).add(samples to timestamp) if (timestamp >= this@InFlowFrameConsumer.windowEnd) { this@InFlowFrameConsumer.audioReady = true From 30ceee9a442c6ed46b50f62fe29a67578cdfaf03 Mon Sep 17 00:00:00 2001 From: Ralph Gasser Date: Fri, 8 Nov 2024 12:50:30 +0100 Subject: [PATCH 12/12] Files are now again opened by UrlInput. Signed-off-by: Ralph Gasser --- .../engine/index/decode/FFmpegVideoDecoder.kt | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt index ac3d4433..3f5dd523 100644 --- a/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt +++ b/vitrivr-engine-index/src/main/kotlin/org/vitrivr/engine/index/decode/FFmpegVideoDecoder.kt @@ -47,15 +47,12 @@ import java.util.concurrent.TimeUnit class FFmpegVideoDecoder : DecoderFactory { override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder { - val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840 - val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160 - val framerate = context[name, "framerate"]?.toIntOrNull() - val video = context[name, "video"]?.let { it.lowercase() == "true" } ?: true - val audio = context[name, "audio"]?.let { it.lowercase() == "true" } ?: true + val video = context[name, "video"]?.let { it.lowercase() == "true" } != false + val audio = context[name, "audio"]?.let { it.lowercase() == "true" } != false val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) } - return Instance(input, context, video, audio, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath) + return Instance(input, context, video, audio, timeWindowMs, ffmpegPath, name) } private class Instance( @@ -64,21 +61,18 @@ class FFmpegVideoDecoder : DecoderFactory { private val video: Boolean = true, private val audio: Boolean = true, private val timeWindowMs: Long = 500L, - private val maxWidth: Int, - private val maxHeight: Int, - private val framerate: Int?, - private val name: String, - private val ffmpegPath: Path? + private val ffmpegPath: Path? = null, + private val name: String ) : Decoder { /** [KLogger] instance. */ private val logger: KLogger = KotlinLogging.logger {} private val ffprobe: FFprobe - get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath() + get() = if (this.ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath() private val ffmpeg: FFmpeg - get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath() + get() = if (this.ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath() override fun toFlow(scope: CoroutineScope): Flow = channelFlow { this@Instance.input.toFlow(scope).collect { sourceRetrievable -> @@ -116,20 +110,24 @@ class FFmpegVideoDecoder : DecoderFactory { /* Create consumer. */ val consumer = InFlowFrameConsumer(this, sourceRetrievable) - /* Execute. */ try { - source.newInputStream().use { - var output = FrameOutput.withConsumerAlpha(consumer).disableStream(StreamType.SUBTITLE).disableStream(StreamType.DATA) - if (!this@Instance.video) { - output = output.disableStream(StreamType.VIDEO) - } - if (!this@Instance.audio) { - output = output.disableStream(StreamType.AUDIO) + var output = FrameOutput.withConsumerAlpha(consumer).disableStream(StreamType.SUBTITLE).disableStream(StreamType.DATA) + if (!this@Instance.video) { + output = output.disableStream(StreamType.VIDEO) + } + if (!this@Instance.audio) { + output = output.disableStream(StreamType.AUDIO) + } + if (source is FileSource) { + this@Instance.ffmpeg.addInput(UrlInput.fromPath(source.path)).addOutput(output).execute() + } else { + source.newInputStream().use { + this@Instance.ffmpeg.addInput(PipeInput.pumpFrom(it)).addOutput(output).execute() } - this@Instance.ffmpeg.addInput(PipeInput.pumpFrom(it)).addOutput(output).execute() } + /* Emit final frames. */ if (!consumer.isEmpty()) { consumer.emit() @@ -200,7 +198,7 @@ class FFmpegVideoDecoder : DecoderFactory { this@InFlowFrameConsumer.videoStream?.id -> this@InFlowFrameConsumer.videoStream!! else -> return@runBlocking } - val timestamp = ((1000000 * frame.pts) / stream.timebase) + val timestamp = ((1_000_000 * frame.pts) / stream.timebase) when (stream.type) { Stream.Type.VIDEO -> { (this@InFlowFrameConsumer.imageBuffer as LinkedList).add(frame.image!! to timestamp) @@ -272,7 +270,7 @@ class FFmpegVideoDecoder : DecoderFactory { ) /* Prepare and append audio content element. */ - if (emitAudio.size > 0) { + if (emitAudio.isNotEmpty()) { val samples = ShortBuffer.allocate(audioSize) for (frame in emitAudio) { frame.clear()