Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

627 support for streaming via stdio #50

Merged
merged 26 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
inputs: [
"{lib,test,config}/**/*.{ex,exs}",
"{lib,test,config,examples}/**/*.{ex,exs}",
".formatter.exs",
"*.exs"
],
Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Config

config :logger, backends: []
varsill marked this conversation as resolved.
Show resolved Hide resolved

if config_env() == :test do
config :membrane_file_plugin, :file_impl, Membrane.File.CommonMock
else
Expand Down
40 changes: 40 additions & 0 deletions examples/file_to_pipe.exs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires to be run within a mix project. Let's add Mix.install so it can be run with elixir file_to_pipe.exs

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This script reads the file whose name is specified as the first argument, and outputs its contents to stdout, basically like `cat`.
# e.g. running `elixir examples/file_to_pipe.exs examples/file_to_pipe.exs` should output the contents of this file.
#
# if Mix pollutes the logs, consider redirecting its logs by overriding the
# [Mix.Shell](https://hexdocs.pm/mix/Mix.Shell.html) behaviour

Mix.start()
Mix.shell(Mix.Shell.Quiet)

Mix.install([{:membrane_file_plugin, path: "."}])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be path: "../"? I believe we are in the examples/ subdirectory


defmodule FileExamplePipeline do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, %{target: target, input: input}) do
spec =
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})

{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

Membrane.File.Sink.redirect_logs_to_stderr()

[input] = System.argv()

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(FileExamplePipeline, %{input: input, target: self()})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
44 changes: 44 additions & 0 deletions examples/pipe_to_file.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This script reads from stdin until EOS, writing the contents to a file specified by the first argument
# The second argument is chunk size, specifying how many bytes to consume from stdin at once.
# The script can be run like this: `echo hello | elixir examples/pipe_to_file.exs /tmp/test 2048`, resulting in 'hello' being written to /tmp/test
#
# if Mix pollutes the logs, consider redirecting its logs by overriding the
# [Mix.Shell](https://hexdocs.pm/mix/Mix.Shell.html) behaviour

Mix.start()
Mix.shell(Mix.Shell.Quiet)

Mix.install([{:membrane_file_plugin, path: "."}])

defmodule PipeToFile do
use Membrane.Pipeline

@impl true
def handle_init(_ctx, %{target: target, output: output, chunk_size: chunk_size}) do
spec =
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})

{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

[output_file, chunk_size_str] = System.argv()
{chunk_size, ""} = Integer.parse(chunk_size_str)

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(PipeToFile, %{
target: self(),
output: output_file,
chunk_size: chunk_size
})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
17 changes: 10 additions & 7 deletions examples/sink_and_source.exs
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
Mix.install([
{:membrane_core, "~> 0.11"},
{:membrane_core, "~> 1.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

defmodule FileExamplePipeline do
@doc """
Example pipeline that reads its source code file and outputs it to /tmp/test.
"""

use Membrane.Pipeline

@doc false
@impl true
def handle_init(_ctx, target) do
spec = [
spec =
child(:file_src, %Membrane.File.Source{location: __ENV__.file})
|> child(:file_sink, %Membrane.File.Sink{location: "/tmp/test"})
]

{[spec: spec, playback: :playing], %{target: target}}
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
def handle_element_end_of_stream(:file_sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

{:ok, _supervisor_pid, pid} = FileExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = Membrane.Pipeline.start_link(FileExamplePipeline, self())

receive do
:done -> FileExamplePipeline.terminate(pid)
:done -> Membrane.Pipeline.terminate(pid)
end
25 changes: 18 additions & 7 deletions examples/sink_multi.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
Mix.install([
{:membrane_core, "~> 0.11"},
{:membrane_core, "~> 1.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

# Filter responsible for generating split events
defmodule Splitter do
@moduledoc """
Receives buffer and splits it into two buffers
of size `head_size` and `buffer.size - head_size`,
sending a split event to multisink in between.
"""

use Membrane.Filter

alias Membrane.Buffer
Expand All @@ -29,7 +35,7 @@ defmodule Splitter do
buffer: {:output, %Buffer{payload: tail}}
]

{ actions, %{split?: false}}
{actions, %{split?: false}}
end

def handle_buffer(:input, buffer, _ctx, %{split?: false}) do
Expand All @@ -40,6 +46,11 @@ end
:ok = File.write!("input.bin", <<0::integer-unit(8)-size(1024)>>)

defmodule SinkMultiExamplePipeline do
@moduledoc """
Example pipeline that reads a binary file
and performs a multisink split when sending it forward.
"""

use Membrane.Pipeline

@doc false
Expand All @@ -51,22 +62,22 @@ defmodule SinkMultiExamplePipeline do
|> child(:file_sink, %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"})
]

{[spec: spec, playback: :playing], %{target: target}}
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
def handle_element_end_of_stream(:file_sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end

def handle_element_end_of_stream(_other, _ctx, state) do
def handle_element_end_of_stream(_elem, _pad, _ctx, state) do
{[], state}
end
end

{:ok, _supervisor_pid, pid} = SinkMultiExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = Membrane.Pipeline.start_link(SinkMultiExamplePipeline, self())

receive do
:done -> SinkMultiExamplePipeline.terminate(pid)
:done -> Membrane.Pipeline.terminate(pid)
end
52 changes: 50 additions & 2 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
defmodule Membrane.File.Sink do
@moduledoc """
Element that creates a file and stores incoming buffers there (in binary format).
Can also be used as a pipe to standard output by setting location to :stdout,
though this requires additional configuration.

When `Membrane.File.SeekSinkEvent` is received, the element starts writing buffers starting
from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
field of the event to `true` to start inserting new buffers without overwriting previous ones.
Please note, that inserting requires rewriting the file, what negatively impacts performance.
For more information refer to `Membrane.File.SeekSinkEvent` moduledoc.

Pipeline logs are directed to standard output by default. To separate them from the sink's output
we recommend redirecting the logger to standard error. For simple use cases using the default logger
configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs_to_stderr/0.
See examples/file_to_pipe.exs for a working example.
"""
use Membrane.Sink

Expand All @@ -15,12 +22,27 @@ defmodule Membrane.File.Sink do
@common_file Membrane.File.CommonFileBehaviour.get_impl()

def_options location: [
spec: Path.t(),
description: "Path of the output file"
spec: Path.t() | :stdout,
description: "Path of the output file or :stdout"
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
]

def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@spec redirect_logs_to_stderr() :: :ok
def redirect_logs_to_stderr() do
varsill marked this conversation as resolved.
Show resolved Hide resolved
:ok = :logger.remove_handler(:default)
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
end

@impl true
def handle_init(_ctx, %__MODULE__{location: :stdout}) do
{[],
%{
location: :stdout
}}
end

@impl true
def handle_init(_ctx, %__MODULE__{location: location}) do
{[],
Expand All @@ -32,6 +54,11 @@ defmodule Membrane.File.Sink do
}}
end

@impl true
def handle_setup(_ctx, %{location: :stdout} = state) do
{[], state}
end

@impl true
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, [:read, :write])
Expand All @@ -45,12 +72,23 @@ defmodule Membrane.File.Sink do
{[demand: :input], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, %{location: :stdout} = state) do
:ok = @common_file.write!(:stdio, buffer)
{[demand: :input], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{[demand: :input], state}
end

@impl true
def handle_event(:input, %SeekSinkEvent{}, _ctx, %{location: :stdout} = _state) do
raise "Seek event not supported for :stdout sink"
end

@impl true
def handle_event(:input, %SeekSinkEvent{insert?: insert?, position: position}, _ctx, state) do
state =
Expand All @@ -63,11 +101,21 @@ defmodule Membrane.File.Sink do

def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_end_of_stream(:input, _ctx, %{location: :stdout} = state) do
{[], state}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], do_merge_and_close(state)}
end

@impl true
def handle_terminate_request(_ctx, %{location: :stdout} = state) do
{[terminate: :normal], state}
end

@impl true
def handle_terminate_request(_ctx, state) do
{[terminate: :normal], do_merge_and_close(state)}
Expand Down
Loading