Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video Decoder using external ffmpeg process #116

Merged
merged 13 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version_caffeine=3.1.8
version_clikt=4.2.2
version_commonsmath3=3.6.1
version_cottontaildb=0.16.7
version_jaffree=2024.08.29
version_javacv=1.5.10
version_javalin=6.3.0
version_jdbc_postgres=42.7.4
Expand Down
3 changes: 3 additions & 0 deletions vitrivr-engine-index/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ dependencies {
implementation group: 'org.bytedeco', name: 'javacv', version: version_javacv
implementation group: 'org.bytedeco', name: 'ffmpeg', version: version_ffmpeg, classifier: project.ext.javacppPlatform

/** Jaffree for external ffmpeg*/
implementation group: 'com.github.kokorin.jaffree', name: 'jaffree', version: version_jaffree

/** ScrImage (used for image resizing). */
implementation group: 'com.sksamuel.scrimage', name: 'scrimage-core', version: version_scrimage

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package org.vitrivr.engine.index.decode

import com.github.kokorin.jaffree.StreamType
import com.github.kokorin.jaffree.ffmpeg.*
import com.github.kokorin.jaffree.ffprobe.FFprobe
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.runBlocking
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.model.content.element.AudioContent
import org.vitrivr.engine.core.model.content.element.ImageContent
import org.vitrivr.engine.core.model.relationship.Relationship
import org.vitrivr.engine.core.model.retrievable.Ingested
import org.vitrivr.engine.core.model.retrievable.Retrievable
import org.vitrivr.engine.core.model.retrievable.attributes.ContentAuthorAttribute
import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute
import org.vitrivr.engine.core.model.retrievable.attributes.time.TimeRangeAttribute
import org.vitrivr.engine.core.operators.ingest.Decoder
import org.vitrivr.engine.core.operators.ingest.DecoderFactory
import org.vitrivr.engine.core.operators.ingest.Enumerator
import org.vitrivr.engine.core.source.MediaType
import org.vitrivr.engine.core.source.Metadata
import org.vitrivr.engine.core.source.Source
import org.vitrivr.engine.core.source.file.FileSource
import java.awt.image.BufferedImage
import java.nio.ShortBuffer
import java.nio.file.Path
import java.util.*
import java.util.concurrent.TimeUnit

/**
* A [Decoder] that can decode [ImageContent] and [AudioContent] from a [Source] of [MediaType.VIDEO].
*
* Based on Jaffree FFmpeg wrapper, which spawns a new FFmpeg process for each [Source].
*
* @author Luca Rossetto
* @author Ralph Gasser
* @version 1.0.0
*/
class FFmpegVideoDecoder : DecoderFactory {

override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder {
val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840
val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160
val framerate = context[name, "framerate"]?.toIntOrNull()
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved
val video = context[name, "video"]?.let { it.lowercase() == "true" } ?: true
val audio = context[name, "audio"]?.let { it.lowercase() == "true" } ?: true
val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L
val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) }

return Instance(input, context, video, audio, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath)
}

private class Instance(
override val input: Enumerator,
private val context: IndexContext,
private val video: Boolean = true,
private val audio: Boolean = true,
private val timeWindowMs: Long = 500L,
private val maxWidth: Int,
private val maxHeight: Int,
private val framerate: Int?,
private val name: String,
private val ffmpegPath: Path?
) : Decoder {

/** [KLogger] instance. */
private val logger: KLogger = KotlinLogging.logger {}

private val ffprobe: FFprobe
get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath()

private val ffmpeg: FFmpeg
get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath()

override fun toFlow(scope: CoroutineScope): Flow<Retrievable> = channelFlow {
[email protected](scope).collect { sourceRetrievable ->
/* Extract source. */
val source = sourceRetrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@collect
if (source.type != MediaType.VIDEO) {
logger.debug { "In flow: Skipping source ${source.name} (${source.sourceId}) because it is not of type VIDEO." }
return@collect
}

val probeResult = ffprobe.setShowStreams(true).also {
if (source is FileSource) {
it.setInput(source.path)
} else {
it.setInput(source.newInputStream())
}
}.execute()

/* Extract metadata. */
val videoStreamInfo = probeResult.streams.find { it.codecType == StreamType.VIDEO }
if (videoStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_VIDEO_FPS] = videoStreamInfo.avgFrameRate.toDouble()
source.metadata[Metadata.METADATA_KEY_AV_DURATION] = (videoStreamInfo.duration * 1000f).toLong()
source.metadata[Metadata.METADATA_KEY_IMAGE_WIDTH] = videoStreamInfo.width
source.metadata[Metadata.METADATA_KEY_IMAGE_HEIGHT] = videoStreamInfo.height
}

val audioStreamInfo = probeResult.streams.find { it.codecType == StreamType.AUDIO }
if (audioStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_AUDIO_CHANNELS] = audioStreamInfo.channels
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLERATE] = audioStreamInfo.sampleRate
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLESIZE] = audioStreamInfo.sampleFmt
}

/* Create consumer. */
val consumer = InFlowFrameConsumer(this, sourceRetrievable)


/* Execute. */
try {
source.newInputStream().use {
var output = FrameOutput.withConsumerAlpha(consumer).disableStream(StreamType.SUBTITLE).disableStream(StreamType.DATA)
if ([email protected]) {
output = output.disableStream(StreamType.VIDEO)
}
if ([email protected]) {
output = output.disableStream(StreamType.AUDIO)
}
[email protected](PipeInput.pumpFrom(it)).addOutput(output).execute()
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved
}

/* Emit final frames. */
if (!consumer.isEmpty()) {
consumer.emit()
}

/* Emit source retrievable. */
send(sourceRetrievable)
} catch (e: Throwable) {
logger.error(e) { "Error while decoding source ${source.name} (${source.sourceId})." }
}
}
}.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND)


/**
* A [FrameConsumer] that emits [Retrievable]s to the downstream [channel].
*/
private inner class InFlowFrameConsumer(private val channel: ProducerScope<Retrievable>, val source: Retrievable) : FrameConsumer {

/** The video [Stream] processed by this [InFlowFrameConsumer]. */
var videoStream: Stream? = null
private set

/** The audio [Stream] processed by this [InFlowFrameConsumer]. */
var audioStream: Stream? = null
private set

/** The end of the time window. */
var windowEnd = TimeUnit.MILLISECONDS.toMicros([email protected])
private set

/** Flag indicating, that video is ready to be emitted. */
var videoReady = false

/** Flag indicating, that audio is ready to be emitted. */
var audioReady = false

/** [List] of grabbed [BufferedImage]s. */
val imageBuffer: List<Pair<BufferedImage,Long>> = LinkedList()

/** [List] of grabbed [ShortBuffer]s. */
val audioBuffer: List<Pair<ShortBuffer,Long>> = LinkedList()

/**
* Returns true if both the image and audio buffer are empty.
*/
fun isEmpty(): Boolean = this.imageBuffer.isEmpty() && this.audioBuffer.isEmpty()

/**
* Initializes this [InFlowFrameConsumer].
*
* @param streams List of [Stream]s to initialize the [InFlowFrameConsumer] with.
*/
override fun consumeStreams(streams: MutableList<Stream>) {
this.videoStream = streams.firstOrNull { it.type == Stream.Type.VIDEO }
this.audioStream = streams.firstOrNull { it.type == Stream.Type.AUDIO }
}

/**
* Consumes a single [Frame].
*
* @param frame [Frame] to consume.
*/
override fun consume(frame: Frame?) = runBlocking {
if (frame == null) return@runBlocking
val stream = when (frame.streamId) {
[email protected]?.id -> [email protected]!!
[email protected]?.id -> [email protected]!!
else -> return@runBlocking
}
val timestamp = ((1000000 * frame.pts) / stream.timebase)
when (stream.type) {
Stream.Type.VIDEO -> {
([email protected] as LinkedList).add(frame.image!! to timestamp)
if (timestamp >= [email protected]) {
[email protected] = true
}
}
Stream.Type.AUDIO -> {
val samples = ShortBuffer.wrap(frame.samples.map { it.toShort() }.toShortArray())
([email protected] as LinkedList).add(samples to timestamp)
if (timestamp >= [email protected]) {
[email protected] = true
}
}
else -> {}
}

/* If enough frames have been collected, emit them. */
if ([email protected] && [email protected]) {
emit()

/* Reset counters and flags. */
[email protected] = !([email protected] != null && [email protected])
[email protected] = !([email protected] != null && [email protected])

/* Update window end. */
[email protected] += TimeUnit.MILLISECONDS.toMicros([email protected])
}
}

/**
* Emits a single [Retrievable] to the downstream [channel].
*/
suspend fun emit() {
/* Audio samples. */
var audioSize = 0
val emitImage = mutableListOf<BufferedImage>()
val emitAudio = mutableListOf<ShortBuffer>()

/* Drain buffers. */
(this.imageBuffer as LinkedList).removeIf {
if (it.second <= this.windowEnd) {
emitImage.add(it.first)
true
} else {
false
}
}
(this.audioBuffer as LinkedList).removeIf {
if (it.second <= this.windowEnd) {
audioSize += it.first.limit()
emitAudio.add(it.first)
true
} else {
false
}
}

/* Prepare ingested with relationship to source. */
val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false)
this.source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) }
ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false))
ingested.addAttribute(
TimeRangeAttribute(
this.windowEnd - TimeUnit.MILLISECONDS.toMicros([email protected]),
this.windowEnd,
TimeUnit.MICROSECONDS
)
)

/* Prepare and append audio content element. */
if (emitAudio.size > 0) {
val samples = ShortBuffer.allocate(audioSize)
for (frame in emitAudio) {
frame.clear()
samples.put(frame)
}
samples.clear()
val audio = [email protected](
this.audioStream!!.channels.toShort(),
this.audioStream!!.sampleRate.toInt(),
samples
)
ingested.addContent(audio)
ingested.addAttribute(ContentAuthorAttribute(audio.id, name))
}

/* Prepare and append image content element. */
for (image in emitImage) {
val imageContent = [email protected](image)
ingested.addContent(imageContent)
ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name))
}

logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images and ${emitAudio.size} audio samples: ${ingested.id}" }

/* Emit ingested. */
this.channel.send(ingested)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,47 @@ class ThumbnailExporter : ExporterFactory {
require(mimeType in SUPPORTED) { "ThumbnailExporter only support image formats JPEG and PNG." }
}

/** [KLogger] instance. */
private val logger: KLogger = KotlinLogging.logger {}

override fun toFlow(scope: CoroutineScope): Flow<Retrievable> = this.input.toFlow(scope).onEach { retrievable ->
val resolvable = this.context.resolver.resolve(retrievable.id)
try {

val contentIds = this.contentSources?.let {
retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it)
}
val resolvable = this.context.resolver.resolve(retrievable.id)

val content = retrievable.content.filterIsInstance<ImageContent>().filter { contentIds?.contains(it.id) ?: true }
if (resolvable != null && content.isNotEmpty()) {
val writer = when (mimeType) {
MimeType.JPEG,
MimeType.JPG -> JpegWriter()
MimeType.PNG -> PngWriter()
else -> throw IllegalArgumentException("Unsupported mime type $mimeType")
val contentIds = this.contentSources?.let {
retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it)
}

logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." }
val content =
retrievable.content.filterIsInstance<ImageContent>().filter { contentIds?.contains(it.id) ?: true }
if (resolvable != null && content.isNotEmpty()) {
val writer = when (mimeType) {
MimeType.JPEG,
MimeType.JPG -> JpegWriter()

content.forEach { cnt ->
val imgBytes = ImmutableImage.fromAwt(cnt.content).let {
if (it.width > it.height) {
it.scaleToWidth(maxResolution)
} else {
it.scaleToHeight(maxResolution)
}
}.bytes(writer)
MimeType.PNG -> PngWriter()
else -> throw IllegalArgumentException("Unsupported mime type $mimeType")
}

resolvable.openOutputStream().use {
it.write(imgBytes)
logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." }

content.forEach { cnt ->
val imgBytes = ImmutableImage.fromAwt(cnt.content).let {
if (it.width > it.height) {
it.scaleToWidth(maxResolution)
} else {
it.scaleToHeight(maxResolution)
}
}.bytes(writer)

resolvable.openOutputStream().use {
it.write(imgBytes)
}
}
}
} catch (e: Exception) {
logger.error(e){"Error during thumbnail creation"}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.vitrivr.engine.index.decode.VideoDecoder
org.vitrivr.engine.index.decode.FFmpegVideoDecoder
org.vitrivr.engine.index.decode.ImageDecoder
Loading