Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video Decoder using external ffmpeg process #116

Merged
merged 13 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version_caffeine=3.1.8
version_clikt=4.2.2
version_commonsmath3=3.6.1
version_cottontaildb=0.16.7
version_jaffree=2024.08.29
version_javacv=1.5.10
version_javalin=6.3.0
version_jdbc_postgres=42.7.4
Expand Down
3 changes: 3 additions & 0 deletions vitrivr-engine-index/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ dependencies {
implementation group: 'org.bytedeco', name: 'javacv', version: version_javacv
implementation group: 'org.bytedeco', name: 'ffmpeg', version: version_ffmpeg, classifier: project.ext.javacppPlatform

/** Jaffree for external ffmpeg*/
implementation group: 'com.github.kokorin.jaffree', name: 'jaffree', version: version_jaffree

/** ScrImage (used for image resizing). */
implementation group: 'com.sksamuel.scrimage', name: 'scrimage-core', version: version_scrimage

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package org.vitrivr.engine.index.decode

import com.github.kokorin.jaffree.StreamType
import com.github.kokorin.jaffree.ffmpeg.*
import com.github.kokorin.jaffree.ffprobe.FFprobe
import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
import org.bytedeco.javacv.FrameGrabber
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.model.relationship.Relationship
import org.vitrivr.engine.core.model.retrievable.Ingested
import org.vitrivr.engine.core.model.retrievable.Retrievable
import org.vitrivr.engine.core.model.retrievable.attributes.ContentAuthorAttribute
import org.vitrivr.engine.core.model.retrievable.attributes.SourceAttribute
import org.vitrivr.engine.core.model.retrievable.attributes.time.TimeRangeAttribute
import org.vitrivr.engine.core.operators.ingest.Decoder
import org.vitrivr.engine.core.operators.ingest.DecoderFactory
import org.vitrivr.engine.core.operators.ingest.Enumerator
import org.vitrivr.engine.core.source.MediaType
import org.vitrivr.engine.core.source.file.FileSource
import java.awt.image.BufferedImage
import java.nio.file.Path
import java.util.*
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import org.vitrivr.engine.core.source.Metadata

class FFmpegVideoDecoder : DecoderFactory {

override fun newDecoder(name: String, input: Enumerator, context: IndexContext): Decoder {
val maxWidth = context[name, "maxWidth"]?.toIntOrNull() ?: 3840
val maxHeight = context[name, "maxHeight"]?.toIntOrNull() ?: 2160
val framerate = context[name, "framerate"]?.toIntOrNull()
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved
val timeWindowMs = context[name, "timeWindowMs"]?.toLongOrNull() ?: 500L
val ffmpegPath = context[name, "ffmpegPath"]?.let { Path.of(it) }

return Instance(input, context, timeWindowMs, maxWidth, maxHeight, framerate, name, ffmpegPath)
}

private class Instance(
override val input: Enumerator,
private val context: IndexContext,
private val timeWindowMs: Long = 500L,
private val maxWidth: Int,
private val maxHeight: Int,
private val framerate: Int?,
private val name: String,
private val ffmpegPath: Path?
) : Decoder {

/** [KLogger] instance. */
private val logger: KLogger = KotlinLogging.logger {}

private val ffprobe: FFprobe
get() = if (ffmpegPath != null) FFprobe.atPath(this.ffmpegPath) else FFprobe.atPath()

private val ffmpeg: FFmpeg
get() = if (ffmpegPath != null) FFmpeg.atPath(this.ffmpegPath) else FFmpeg.atPath()

override fun toFlow(scope: CoroutineScope): Flow<Retrievable> = channelFlow {
[email protected](scope).collect { sourceRetrievable ->
/* Extract source. */
val source = sourceRetrievable.filteredAttribute(SourceAttribute::class.java)?.source ?: return@collect
if (source.type != MediaType.VIDEO) {
logger.debug { "In flow: Skipping source ${source.name} (${source.sourceId}) because it is not of type VIDEO." }
return@collect
}

val probeResult = ffprobe.setShowStreams(true).also {
if (source is FileSource) {
it.setInput(source.path)
} else {
it.setInput(source.newInputStream())
}
}.execute()

val videoStreamInfo = probeResult.streams.find { it.codecType == StreamType.VIDEO }

if (videoStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_VIDEO_FPS] = videoStreamInfo.avgFrameRate.toDouble()
source.metadata[Metadata.METADATA_KEY_AV_DURATION] = (videoStreamInfo.duration * 1000f).toLong()
source.metadata[Metadata.METADATA_KEY_IMAGE_WIDTH] = videoStreamInfo.width
source.metadata[Metadata.METADATA_KEY_IMAGE_HEIGHT] = videoStreamInfo.height
}

val audioStreamInfo = probeResult.streams.find { it.codecType == StreamType.AUDIO }

if (audioStreamInfo != null) {
source.metadata[Metadata.METADATA_KEY_AUDIO_CHANNELS] = audioStreamInfo.channels
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLERATE] = audioStreamInfo.sampleRate
source.metadata[Metadata.METADATA_KEY_AUDIO_SAMPLESIZE] = audioStreamInfo.sampleFmt
}

var windowEnd = TimeUnit.MILLISECONDS.toMicros([email protected])

val imageTransferBuffer = LinkedBlockingQueue<Pair<BufferedImage, Long>>(10)

val ffmpegInstance = ffmpeg.addInput(
if (source is FileSource) {
UrlInput.fromPath(source.path)
} else {
PipeInput.pumpFrom(source.newInputStream())
}
).addOutput(
FrameOutput.withConsumerAlpha(
object : FrameConsumer {

val streamMap = mutableMapOf<Int, Stream>()

override fun consumeStreams(streams: MutableList<Stream>) {
streams.forEach { stream -> streamMap[stream.id] = stream }
}

override fun consume(frame: Frame) {

val stream = streamMap[frame.streamId] ?: return

when (stream.type) {
Stream.Type.VIDEO -> {
imageTransferBuffer.put(frame.image!! to (1000000 * frame.pts) / stream.timebase)
}

Stream.Type.AUDIO -> {
//TODO
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved
}

null -> {
/* ignore */
}
}
}

}
)
).setFilter(
StreamType.VIDEO,
"scale=w='min($maxWidth,iw)':h='min($maxHeight,ih)':force_original_aspect_ratio=decrease${if (framerate != null && framerate > 0) ",fps=$framerate" else ""}'"
)

//TODO audio settings

val future = ffmpegInstance.executeAsync()
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved

val localImageBuffer = LinkedList<Pair<BufferedImage, Long>>()

while (!(future.isDone || future.isCancelled) || imageTransferBuffer.isNotEmpty()) {

val next = imageTransferBuffer.poll(1, TimeUnit.SECONDS) ?: continue
localImageBuffer.add(next)

if (localImageBuffer.last().second >= windowEnd) {
emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow)
windowEnd += TimeUnit.MILLISECONDS.toMicros([email protected])
}

}

while (localImageBuffer.isNotEmpty()) {
emit(localImageBuffer, windowEnd, sourceRetrievable, this@channelFlow)
windowEnd += TimeUnit.MILLISECONDS.toMicros([email protected])
}

send(sourceRetrievable)

}
}.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND)


/**
* Emits a single [Retrievable] to the downstream [channel].
*
* @param imageBuffer A [LinkedList] containing [BufferedImage] elements to emit (frames).
* @param grabber The [FrameGrabber] instance.
* @param timestampEnd The end timestamp.
* @param source The source [Retrievable] the emitted [Retrievable] is part of.
*/
private suspend fun emit(
imageBuffer: MutableList<Pair<BufferedImage, Long>>,
timestampEnd: Long,
source: Retrievable,
channel: ProducerScope<Retrievable>
) {

val emitImage = mutableListOf<BufferedImage>()

/* Drain buffer. */
imageBuffer.removeIf {
if (it.second <= timestampEnd) {
emitImage.add(it.first)
true
} else {
false
}
}


/* Prepare ingested with relationship to source. */
val ingested = Ingested(UUID.randomUUID(), "SEGMENT", false)
source.filteredAttribute(SourceAttribute::class.java)?.let { ingested.addAttribute(it) }
ingested.addRelationship(Relationship.ByRef(ingested, "partOf", source, false))
ingested.addAttribute(
TimeRangeAttribute(
timestampEnd - TimeUnit.MILLISECONDS.toMicros([email protected]),
timestampEnd,
TimeUnit.MICROSECONDS
)
)

/* Prepare and append image content element. */
for (image in emitImage) {
val imageContent = this.context.contentFactory.newImageContent(image)
ingested.addContent(imageContent)
ingested.addAttribute(ContentAuthorAttribute(imageContent.id, name))
}

logger.debug { "Emitting ingested ${ingested.id} with ${emitImage.size} images: ${ingested.id}" }

/* Emit ingested. */
channel.send(ingested)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,47 @@ class ThumbnailExporter : ExporterFactory {
require(mimeType in SUPPORTED) { "ThumbnailExporter only support image formats JPEG and PNG." }
}

/** [KLogger] instance. */
private val logger: KLogger = KotlinLogging.logger {}

override fun toFlow(scope: CoroutineScope): Flow<Retrievable> = this.input.toFlow(scope).onEach { retrievable ->
val resolvable = this.context.resolver.resolve(retrievable.id)
try {

val contentIds = this.contentSources?.let {
retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it)
}
val resolvable = this.context.resolver.resolve(retrievable.id)

val content = retrievable.content.filterIsInstance<ImageContent>().filter { contentIds?.contains(it.id) ?: true }
if (resolvable != null && content.isNotEmpty()) {
val writer = when (mimeType) {
MimeType.JPEG,
MimeType.JPG -> JpegWriter()
MimeType.PNG -> PngWriter()
else -> throw IllegalArgumentException("Unsupported mime type $mimeType")
val contentIds = this.contentSources?.let {
retrievable.filteredAttribute(ContentAuthorAttribute::class.java)?.getContentIds(it)
}

logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." }
val content =
retrievable.content.filterIsInstance<ImageContent>().filter { contentIds?.contains(it.id) ?: true }
if (resolvable != null && content.isNotEmpty()) {
val writer = when (mimeType) {
MimeType.JPEG,
MimeType.JPG -> JpegWriter()

content.forEach { cnt ->
val imgBytes = ImmutableImage.fromAwt(cnt.content).let {
if (it.width > it.height) {
it.scaleToWidth(maxResolution)
} else {
it.scaleToHeight(maxResolution)
}
}.bytes(writer)
MimeType.PNG -> PngWriter()
else -> throw IllegalArgumentException("Unsupported mime type $mimeType")
}

resolvable.openOutputStream().use {
it.write(imgBytes)
logger.debug { "Generating thumbnail(s) for ${retrievable.id} with ${retrievable.type} and resolution $maxResolution. Storing it with ${resolvable::class.simpleName}." }

content.forEach { cnt ->
val imgBytes = ImmutableImage.fromAwt(cnt.content).let {
if (it.width > it.height) {
it.scaleToWidth(maxResolution)
} else {
it.scaleToHeight(maxResolution)
}
}.bytes(writer)

resolvable.openOutputStream().use {
it.write(imgBytes)
}
}
}
} catch (e: Exception) {
logger.error(e){"Error during thumbnail creation"}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
org.vitrivr.engine.index.decode.VideoDecoder
org.vitrivr.engine.index.decode.FFmpegVideoDecoder
org.vitrivr.engine.index.decode.ImageDecoder
1 change: 1 addition & 0 deletions vitrivr-engine-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
api project(':vitrivr-engine-module-features') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */
api project(':vitrivr-engine-module-cottontaildb') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */
api project(':vitrivr-engine-module-pgvector') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */
api project(':vitrivr-engine-module-jsonl') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */
api project(':vitrivr-engine-module-fes') /* TODO: This dependency is not necessary and only here to facilitate easy testing. */

/** Clikt & JLine */
Expand Down