Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video Decoder using external ffmpeg process #116

Merged
merged 13 commits into from
Nov 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.vitrivr.engine.index.decode

import com.github.kokorin.jaffree.JaffreeException
import com.github.kokorin.jaffree.StreamType
import com.github.kokorin.jaffree.ffmpeg.*
import com.github.kokorin.jaffree.ffprobe.FFprobe
Expand Down Expand Up @@ -116,27 +115,30 @@ class FFmpegVideoDecoder : DecoderFactory {

/* Create consumer. */
val consumer = InFlowFrameConsumer(this, sourceRetrievable)
val ffmpegInstance = [email protected](
if (source is FileSource) {
UrlInput.fromPath(source.path)
} else {
PipeInput.pumpFrom(source.newInputStream())
}
).addOutput(FrameOutput.withConsumerAlpha(consumer))


/* Execute. */
try {
ffmpegInstance.execute()
} catch (e: JaffreeException) {
logger.warn(e) { "Error while decoding source ${source.name} (${source.sourceId})." }
} finally {
source.newInputStream().use {
var output = FrameOutput.withConsumerAlpha(consumer).disableStream(StreamType.SUBTITLE).disableStream(StreamType.DATA)
if ([email protected]) {
output = output.disableStream(StreamType.VIDEO)
}
if ([email protected]) {
output = output.disableStream(StreamType.AUDIO)
}
[email protected](PipeInput.pumpFrom(it)).addOutput(output).execute()
ppanopticon marked this conversation as resolved.
Show resolved Hide resolved
}

/* Emit final frames. */
if (!consumer.isEmpty()) {
consumer.emit()
}

/* Emit source retrievable. */
send(sourceRetrievable)
} catch (e: Throwable) {
logger.error(e) { "Error while decoding source ${source.name} (${source.sourceId})." }
}
}
}.buffer(capacity = RENDEZVOUS, onBufferOverflow = BufferOverflow.SUSPEND)
Expand Down Expand Up @@ -191,13 +193,12 @@ class FFmpegVideoDecoder : DecoderFactory {
*
* @param frame [Frame] to consume.
*/
override fun consume(frame: Frame) = runBlocking {
val stream = if (frame.streamId == [email protected]?.id) {
[email protected]!!
} else if (frame.streamId == [email protected]?.id) {
[email protected]!!
} else {
return@runBlocking
override fun consume(frame: Frame?) = runBlocking {
if (frame == null) return@runBlocking
val stream = when (frame.streamId) {
[email protected]?.id -> [email protected]!!
[email protected]?.id -> [email protected]!!
else -> return@runBlocking
}
val timestamp = ((1000000 * frame.pts) / stream.timebase)
when (stream.type) {
Expand Down