Skip to content

Commit

Permalink
OutputQueue for output
Browse files Browse the repository at this point in the history
Output side counterpart to the StreamBuffer. It is not exactly the
same solution, because input and output characteristics are quite
different (for one, on the input side stream is just a series of
bytes, whereas on the output side we are able to tell packets
apart, assign timestamps, packet types, etc).

There is simple Golang code for writing the data into the files,
so that some tests will pass with this code. Note that the big
problem with the queue output is that some muxers (for example
mp4) need to be able to seek() in output, and we don't allow that.
  • Loading branch information
Michal Adamczak committed Aug 9, 2022
1 parent 8a2f6e7 commit c74c5c6
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 9 deletions.
47 changes: 42 additions & 5 deletions ffmpeg/encoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,16 @@ static int open_video_encoder(struct input_ctx *ictx, struct output_ctx *octx,
static void free_output_no_trailer(struct output_ctx *octx, enum FreeOutputPolicy policy)
{
if (octx->oc) {
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) && octx->oc->pb) {
// we check against AVFMT_FLAG_CUSTOM_IO to avoid trying to close file
// in case we are using custom i/o - this would cause crash
if (!(octx->oc->oformat->flags & AVFMT_NOFILE) &&
!(octx->oc->flags & AVFMT_FLAG_CUSTOM_IO) && octx->oc->pb) {
avio_closep(&octx->oc->pb);
}
avformat_free_context(octx->oc);
octx->oc = NULL;
}
queue_push_staging(&octx->write_context, END_OF_OUTPUT, -1);
if (octx->vc &&
((octx->hw_type == AV_HWDEVICE_TYPE_NONE) || (FORCE_CLOSE_HW_ENCODER == policy))) {
avcodec_free_context(&octx->vc);
Expand Down Expand Up @@ -295,7 +299,7 @@ void free_output(struct output_ctx *octx, enum FreeOutputPolicy policy)
free_output_no_trailer(octx, policy);
}

int open_output(struct output_ctx *octx, struct input_ctx *ictx)
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue)
{
int ret = 0;

Expand Down Expand Up @@ -351,8 +355,27 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)

// Muxer headers can be written now once streams were added
if (!(fmt->flags & AVFMT_NOFILE)) {
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
if (queue) {
// output through queue
ret = queue_setup_as_output(queue, &octx->write_context, octx->oc);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting up output queue");
// make sure muxer options are compatible with queue output
// TODO: not sure if that is the best option for detecting a container
// type but it is surprisingly hard to find guidance on that
if (fmt->mime_type && !strcmp("video/mp4", fmt->mime_type)) {
// Default configuration of MP4 muxer needs seekable output, which
// the queue is not able to provide. Passing the following flags removes
// seekable requirement. This is also configuration recommended for
// streaming purposes, so it seems better suited anyway (the whole point
// with queues is to provide Low Latency/streaming support)
ret = av_dict_set(&octx->muxer->opts, "movflags", "frag_keyframe+empty_moov", 0);
if (ret < 0) LPMS_ERR(open_output_err, "Error setting movflags for fragmented output");
}
} else {
// normal file output
ret = avio_open(&octx->oc->pb, octx->fname, AVIO_FLAG_WRITE);
if (ret < 0) LPMS_ERR(open_output_err, "Error opening output file");
}
}

// IMPORTANT: notice how up to and including this point open_output_err is
Expand All @@ -364,6 +387,10 @@ int open_output(struct output_ctx *octx, struct input_ctx *ictx)
// call free_output_no_trailer() exclusively!
ret = avformat_write_header(octx->oc, &octx->muxer->opts);
if (ret < 0) LPMS_ERR(open_output_err, "Error writing header");
// flush headers
// ret = av_interleaved_write_frame(octx->oc, NULL);
if (ret < 0) LPMS_ERR(open_output_err, "Error flushing headers");
queue_push_staging(&octx->write_context, BEGIN_OF_OUTPUT, 0);

// From now on it is normal free_output(), hence after_header error label
if(octx->sfilters != NULL && needs_decoder(octx->video->name) && octx->sf.active == 0) {
Expand Down Expand Up @@ -431,6 +458,8 @@ static int encode(AVCodecContext* encoder, AVFrame *frame, struct output_ctx* oc

int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
{
int ret;
int64_t pts = pkt->pts;
pkt->stream_index = ost->index;
if (av_cmp_q(tb, ost->time_base)) {
av_packet_rescale_ts(pkt, tb, ost->time_base);
Expand Down Expand Up @@ -481,7 +510,15 @@ int mux(AVPacket *pkt, AVRational tb, struct output_ctx *octx, AVStream *ost)
octx->last_video_dts = pkt->dts;
}

return av_interleaved_write_frame(octx->oc, pkt);
// make sure correct timestamp will get carried through to output_queue
ret = av_interleaved_write_frame(octx->oc, pkt);
if (0 > ret) return ret;
// this means "flush output", we want to do it so that output_queue will get
// properly associated packets and timestamps
ret = av_interleaved_write_frame(octx->oc, NULL);
if (0 > ret) return ret;
queue_push_staging(&octx->write_context, PACKET_OUTPUT, pts);
return 0;
}

static int getmetadatainf(AVFrame *inf, struct output_ctx *octx)
Expand Down
3 changes: 2 additions & 1 deletion ffmpeg/encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
#include "decoder.h"
#include "transcoder.h"
#include "filter.h"
#include "output_queue.h"

enum FreeOutputPolicy {
FORCE_CLOSE_HW_ENCODER,
PRESERVE_HW_ENCODER
};

int open_output(struct output_ctx *octx, struct input_ctx *ictx);
int open_output(struct output_ctx *octx, struct input_ctx *ictx, OutputQueue *queue);
void free_output(struct output_ctx *octx, enum FreeOutputPolicy);
int process_out(struct input_ctx *ictx, struct output_ctx *octx, AVCodecContext *encoder, AVStream *ost,
struct filter_ctx *filter, AVFrame *inf);
Expand Down
51 changes: 51 additions & 0 deletions ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,47 @@ func loadInputBuffer(t *Transcoder, input *TranscodeOptionsIn) {
}
}

func storeOutputQueue(t *Transcoder, outputs []TranscodeOptions) (error) {
var fds = make([]*os.File, len(outputs))
for i := range fds {
file, err := os.Create(outputs[i].Oname)
if nil != err {
return err
}
fds[i] = file
}
for {
// get next output packet
packet := C.lpms_transcode_peek_packet(t.handle)
if 8 == packet.flags {
break
}
// this is a data packet, write it to file
data := C.GoBytes(unsafe.Pointer(packet.data), packet.size)
bytes, err := fds[packet.index].Write(data)
if nil != err {
return err
}
if bytes != int(packet.size) {
panic("storeOutputQueue couldn't write all bytes error")
}
// pop data packet
C.lpms_transcode_pop_packet(t.handle)
}
// if we are here, we just have terminating packet, remove it
// (terminating packet carries no data, it is added there to signify
// the end of all input)
C.lpms_transcode_pop_packet(t.handle)
// Close all the open files
for i := range fds {
if nil != fds[i] {
fds[i].Close()
}
}
// Success
return nil
}


// create C output params array and return it along with corresponding finalizer
// function that makes sure there are no C memory leaks
Expand Down Expand Up @@ -961,6 +1002,16 @@ func (t *Transcoder) Transcode(input *TranscodeOptionsIn, ps []TranscodeOptions)
}
loadInputBuffer(t, input)
ret := int(C.lpms_transcode(inp, paramsPointer, resultsPointer, C.int(len(params)), decoded))
// be careful to use storeOutputQueue to fake lpms_transcode so that the test
// reacts properly
if (ret == 0) {
err = storeOutputQueue(t, ps)
if nil != err {
// fake output error
ret = C.AVERROR_STREAM_NOT_FOUND
}
}

if ret != 0 {
if LogTranscodeErrors {
glog.Error("Transcoder Return : ", ErrorMap[ret])
Expand Down
4 changes: 4 additions & 0 deletions ffmpeg/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <libavfilter/avfilter.h>
#include "decoder.h"
#include "output_queue.h"

struct filter_ctx {
int active;
Expand Down Expand Up @@ -36,6 +37,7 @@ struct filter_ctx {
int flushing;
};

// TODO move this away, this ain't filter
struct output_ctx {
char *fname; // required output file name
char *vfilters; // required output video filters
Expand Down Expand Up @@ -74,6 +76,8 @@ struct output_ctx {

output_results *res; // data to return for this output
char *xcoderParams;

WriteContext write_context;
};

int init_video_filters(struct input_ctx *ictx, struct output_ctx *octx);
Expand Down
158 changes: 158 additions & 0 deletions ffmpeg/output_queue.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "output_queue.h"

void queue_create(OutputQueue *queue)
{
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->condition, NULL);
queue->front = queue->back = NULL;
}

void queue_destroy(OutputQueue *queue)
{
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->condition);
queue_reset(queue);
}

static int queue_write_function(void *user_data, uint8_t *buf, int buf_size)
{
WriteContext *wctx = (WriteContext *)user_data;
// Prepare packet
OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket));
if (!packet) return -1;
packet->data = (uint8_t *)malloc(buf_size);
if (!packet->data) {
free(packet);
return -1;
}
memcpy(packet->data, buf, buf_size);
packet->size = buf_size;
packet->index = wctx->index;
packet->next = NULL;
// Important - we are not adding to the queue now. This is because we don't
// know which flags to assign yet - for example, we can't assign END_OF_OUTPUT
// because we don't know if we are at last packet or not. Instead, packets are
// added to the staging area and queue_push_staging will be used to add them
// to the queue after muxing operation finishes.
if (wctx->staging_back) {
// not the first packet
wctx->staging_back->next = packet;
wctx->staging_back = packet;
} else {
// first packet
wctx->staging_front = wctx->staging_back = packet;
}
return buf_size;
}

int queue_setup_as_output(OutputQueue *queue, WriteContext *wctx, AVFormatContext *ctx)
{
wctx->queue = queue;
wctx->staging_front = wctx->staging_back = NULL;
// IMPORTANT: I am not sure if ffmpeg documentation states that explicitly,
// but the memory of ctx->pb as well as its io_buffer seem to be released when
// ctx will get closed. I tried otherwise and got "double free" errors
#define BUFFER_SIZE 4096
void *io_buffer = av_malloc(BUFFER_SIZE);
if (!io_buffer) return -1;
ctx->pb = avio_alloc_context(
io_buffer, BUFFER_SIZE, // buffer and size
1, // write allowed
wctx, // pass write context as user data
NULL, // no read function supplied
queue_write_function,
NULL); // no seek function supplied
if (!ctx->pb) return -1;
ctx->flags |= AVFMT_FLAG_CUSTOM_IO | AVFMT_FLAG_FLUSH_PACKETS;
return 0;
}

void queue_reset(OutputQueue *queue)
{
while (queue->front) {
OutputPacket *tmp = queue->front;
queue->front = queue->front->next;
if (tmp->data) free(tmp->data);
free(tmp);
}
queue->back = NULL;
}

const OutputPacket *queue_peek_front(OutputQueue *queue)
{
OutputPacket *tmp;
pthread_mutex_lock(&queue->mutex);
while (!queue->front) {
// wait until there is packet in the buffer
pthread_cond_wait(&queue->condition, &queue->mutex);
}
tmp = queue->front;
pthread_mutex_unlock(&queue->mutex);
return tmp;
}

void queue_pop_front(OutputQueue *queue)
{
OutputPacket *tmp;
pthread_mutex_lock(&queue->mutex);
while (!queue->front) {
// wait until there is packet in the buffer
pthread_cond_wait(&queue->condition, &queue->mutex);
}
tmp = queue->front;
queue->front = queue->front->next;
if (!queue->front) queue->back = NULL;
pthread_mutex_unlock(&queue->mutex);
if (tmp->data) free(tmp->data);
free(tmp);
}

void queue_push_staging(WriteContext *wctx, PacketFlags flags, int64_t timestamp)
{
// iterate over staging area setting flags and timestamps
OutputPacket *packet = wctx->staging_front;
// Make sure that END_OF_OUTPUT only gets assigned to the last packet
// this is because the caller knows all packets are emitted, but it
// doesn't know how many of them
PacketFlags safe_flags = flags & ~END_OF_OUTPUT;
if (!packet) return; // nothing to do
while (packet) {
packet->flags = packet->next ? safe_flags : flags;
packet->timestamp = timestamp;
packet = packet->next;
}
// move staging area into queue
pthread_mutex_lock(&wctx->queue->mutex);
if (wctx->queue->back) {
// not empty queue
wctx->queue->back->next = wctx->staging_front;
wctx->queue->back = wctx->staging_back;
} else {
// empty queue
wctx->queue->front = wctx->staging_front;
wctx->queue->back = wctx->staging_back;
}
wctx->staging_front = wctx->staging_back = NULL;
pthread_mutex_unlock(&wctx->queue->mutex);
pthread_cond_signal(&wctx->queue->condition);
}

int queue_push_end(OutputQueue *queue)
{
OutputPacket *packet = (OutputPacket *)malloc(sizeof(OutputPacket));
if (!packet) return -1;
packet->size = 0;
packet->data = NULL;
packet->timestamp = -1;
packet->flags = END_OF_ALL_OUTPUTS;
pthread_mutex_lock(&queue->mutex);
if (queue->back) {
queue->back->next = packet;
queue->back = packet;
} else {
queue->front = queue->back = packet;
}
pthread_mutex_unlock(&queue->mutex);
pthread_cond_signal(&queue->condition);
return 0;
}
Loading

0 comments on commit c74c5c6

Please sign in to comment.