diff --git a/test/gstreamer.tcl b/test/gstreamer.tcl new file mode 100644 index 00000000..7f22a0d5 --- /dev/null +++ b/test/gstreamer.tcl @@ -0,0 +1,24 @@ +loadVirtualPrograms [list "virtual-programs/gstreamer.folk" "virtual-programs/images.folk"] +Step + +# namespace eval Pipeline $::makePipeline +# set pl [Pipeline::create "videotestsrc"] +# Pipeline::play $pl +# set img [Pipeline::frame $pl] +# Pipeline::freeImage $img +# Pipeline::destroy $pl + +When the gstreamer pipeline "videotestsrc" frame is /frame/ at /ts/ { + Wish the web server handles route "/gst-image/$" with handler [list apply {{im} { + set filename "/tmp/web-image-frame.png" + image saveAsPng $im $filename + set fsize [file size $filename] + set fd [open $filename r] + fconfigure $fd -encoding binary -translation binary + set body [read $fd $fsize] + close $fd + dict create statusAndHeaders "HTTP/1.1 200 OK\nConnection: close\nContent-Type: image/png\nContent-Length: $fsize\n\n" body $body + }} $frame] +} + +forever { Step } diff --git a/virtual-programs/gstreamer.folk b/virtual-programs/gstreamer.folk new file mode 100644 index 00000000..7f04cc97 --- /dev/null +++ b/virtual-programs/gstreamer.folk @@ -0,0 +1,210 @@ +set makePipeline { + rename [c create] cc + + cc cflags {*}[exec pkg-config --cflags --libs gstreamer-1.0] + cc include + cc include + + proc defineGObjectType {cc type cast} { + set cc [uplevel {namespace current}]::$cc + $cc argtype $type* [format { + %s* $argname; + GObject* _$argname; + sscanf(Tcl_GetString($obj), "(%s) 0x%%p", &_$argname); + $argname = %s(_$argname); + } $type $type $cast] + + # Tcl_ObjPrintf doesn't work with %lld/%llx for some reason, + # so we do it by hand. + $cc rtype $type* [format { + $robj = Tcl_ObjPrintf("(%s) 0x%%" PRIxPTR, (uintptr_t) G_OBJECT($rvalue)); + } $type] + } + + defineImageType cc + defineGObjectType cc GstElement GST_ELEMENT + defineGObjectType cc GstBus GST_BUS + + cc struct pipeline_t { + GstElement* pipeline; + GstElement* sink; + GstBus* bus; + } + + cc struct frame_t { + bool valid; + uint64_t timestamp; + image_t image; + } + + cc code { + void log_messages(GstBus* bus) { + GstMessage* msg; + GError *err = NULL; + gchar *dbg_info = NULL; + while ((msg = gst_bus_pop_filtered(bus, GST_MESSAGE_ERROR | GST_MESSAGE_WARNING))) { + switch (GST_MESSAGE_TYPE (msg)) { + case GST_MESSAGE_ERROR: { + gst_message_parse_error(msg, &err, &dbg_info); + g_printerr("ERROR from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); + g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none"); + g_error_free(err); + g_free(dbg_info); + break; + } + case GST_MESSAGE_WARNING: { + gst_message_parse_warning(msg, &err, &dbg_info); + g_printerr("WARNING from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message); + g_printerr("Debugging info: %s\n", (dbg_info) ? dbg_info : "none"); + g_error_free(err); + g_free(dbg_info); + break; + } + default: + break; + } + } + } + } + + cc proc destroy {pipeline_t p} void { + gst_object_unref(p.bus); + gst_object_unref(p.sink); + gst_element_set_state(p.pipeline, GST_STATE_NULL); + gst_object_unref(p.pipeline); + } + + cc proc create {char* srcdec} pipeline_t { + GError* err = NULL; + gst_init(NULL, NULL); + + char buf[512]; + snprintf(buf, sizeof(buf), "%s ! videoconvert ! appsink caps=video/x-raw,format=RGBA name=output drop=true max-buffers=1", srcdec); + GstElement* pipeline = gst_parse_launch(buf, &err); + if (err) { + g_printerr("ERROR launching gst pipeline: %s\n", err->message); + FOLK_ERROR("Error launching pipeline"); + } + + pipeline_t p; + p.pipeline = pipeline; + p.bus = gst_element_get_bus(p.pipeline); + p.sink = gst_bin_get_by_name(GST_BIN(p.pipeline), "output"); + log_messages(p.bus); + + return p; + } + + cc proc play {pipeline_t p} void { + GstState state; + gst_element_set_state(p.pipeline, GST_STATE_PLAYING); + gst_element_get_state(p.pipeline, &state, NULL, GST_CLOCK_TIME_NONE); + log_messages(p.bus); + + if (state != GST_STATE_PLAYING) { + g_printerr("ERROR launching gst pipeline: pipeline failed to start\n"); + destroy(p); + FOLK_ERROR("Error starting pipeline playback"); + } + } + + if {[namespace exists ::Heap]} { + cc import ::Heap::cc folkHeapAlloc as folkHeapAlloc + cc import ::Heap::cc folkHeapFree as folkHeapFree + } else { + cc code { + #define folkHeapAlloc malloc + #define folkHeapFree free + } + } + cc proc frame {pipeline_t p} frame_t { + frame_t frame; + + GstSample* sample; + g_signal_emit_by_name(p.sink, "pull-sample", &sample); + FOLK_CHECK(sample, "pipeline playback stopped"); + + GstCaps* caps = gst_sample_get_caps(sample); + // gst_println("caps are %" GST_PTR_FORMAT, caps); + + GstStructure* s = gst_caps_get_structure(caps, 0); + FOLK_ENSURE(gst_structure_get_int(s, "width", (gint*)&frame.image.width)); + FOLK_ENSURE(gst_structure_get_int(s, "height", (gint*)&frame.image.height)); + const gchar* format = gst_structure_get_string(s, "format"); + if (g_str_equal(format, "RGB")) { + frame.image.components = 3; + } else if (g_str_equal(format, "RGBA")) { + frame.image.components = 4; + } else { + g_printerr("frame: invalid cap format '%s'\n", format); + FOLK_ERROR("invalid cap format"); + } + frame.image.bytesPerRow = frame.image.width * frame.image.components; + + GstMapInfo map; + GstBuffer* buffer = gst_sample_get_buffer(sample); + gst_buffer_map(buffer, &map, GST_MAP_READ); + + frame.image.data = folkHeapAlloc(map.size); + memmove(frame.image.data, map.data, map.size); + frame.timestamp = (uint64_t) GST_BUFFER_DTS(buffer); + + gst_buffer_unmap(buffer, &map); + gst_sample_unref(sample); + + return frame; + } + + cc proc freeImage {image_t image} void { + folkHeapFree(image.data); + } + + cc compile +} + +set ::pipelineIndex 0 +When when the gstreamer pipeline /pl/ frame is /frame/ at /ts/ /lambda/ with environment /e/ { + Start process "gstreamer-[incr ::pipelineIndex]" { + Wish $::thisProcess shares statements like \ + [list /someone/ claims the gstreamer pipeline /...anything/] + + namespace eval Pipeline $makePipeline + + try { + set pipe [Pipeline::create $pl] + Commit { Claim the gstreamer pipeline $pl is starting } + Pipeline::play $pipe + Commit { Claim the gstreamer pipeline $pl is playing with time 0 } + } on error e { + Commit { + Claim the gstreamer pipeline $pl is stopped + Claim the gstreamer pipeline $pl has error $e + } + } + + set ::oldFrames [list] + When the gstreamer pipeline $pl is playing with time /t/ &\ + $::thisProcess has step count /c/ { + try { + set frame [Pipeline::frame $pipe] + dict with frame { + Commit { + Claim the gstreamer pipeline $pl is playing with time $timestamp + Claim the gstreamer pipeline $pl frame is $image at [clock milliseconds] + } + + lappend ::oldFrames $image + if {[llength $::oldFrames] >= 10} { + set ::oldFrames [lassign $::oldFrames oldestFrame] + Pipeline::freeImage $oldestFrame + } + } + } on error e { + Commit { + Claim the gstreamer pipeline $pl is stopped + Claim the gstreamer pipeline $pl has error $e + } + } + } + } +}