Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

627 support for streaming via stdio #50

Merged
merged 26 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions examples/file_to_pipe.exs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires to be run within a mix project. Let's add Mix.install so it can be run with elixir file_to_pipe.exs

Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline
Mix.start()
Mix.shell(Mix.Shell.Quiet)
# if Mix pollutes the logs, consider redirecting its logs by overriding the
# [Mix.Shell](https://hexdocs.pm/mix/Mix.Shell.html) behaviour

# redirect membrane logs to stderr
# currently also requires 'configure: :logger, backends: []' line in config.exs
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
Mix.install([{:membrane_file_plugin, path: "."}])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be path: "../"? I believe we are in the examples/ subdirectory


[input | _] = System.argv()
defmodule FileExamplePipeline do
use Membrane.Pipeline

spec =
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})
@impl true
def handle_init(_ctx, %{target: target, input: input}) do
spec =
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)
{[spec: spec], %{target: target}}
end

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

# # redirect membrane logs to stderr
Membrane.File.Sink.redirect_logs()

[input] = System.argv()

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(FileExamplePipeline, %{input: input, target: self()})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
41 changes: 30 additions & 11 deletions examples/pipe_to_file.exs
Original file line number Diff line number Diff line change
@@ -1,18 +1,37 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline
Mix.start()
Mix.shell(Mix.Shell.Quiet)

LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
Mix.install([{:membrane_file_plugin, path: "."}])

[output, chunk_size_str | _] = System.argv()
{chunk_size, ""} = Integer.parse(chunk_size_str)

spec =
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})
defmodule PipeToFile do
use Membrane.Pipeline

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)
@impl true
def handle_init(_ctx, %{target: target, output: output, chunk_size: chunk_size}) do
spec =
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

{:ok, _supervisor, pid} =
Membrane.Pipeline.start_link(PipeToFile, %{
target: self(),
output: output,
chunk_size: chunk_size
})

receive do
:done -> Membrane.Pipeline.terminate(pid)
end
21 changes: 18 additions & 3 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ defmodule Membrane.File.Sink do
Can also be used as a pipe to standard output by setting location to :stdout,
though this requires additional configuration.

Pipeline logs are directed to standard output by default. To separate them from the sink's output
we reccomend redirecting the logger to standard error. See 'examples/file_to_pipe.exs'

When `Membrane.File.SeekSinkEvent` is received, the element starts writing buffers starting
from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
field of the event to `true` to start inserting new buffers without overwriting previous ones.
Please note, that inserting requires rewriting the file, what negatively impacts performance.
For more information refer to `Membrane.File.SeekSinkEvent` moduledoc.

Pipeline logs are directed to standard output by default. To separate them from the sink's output
we recommend redirecting the logger to standard error. For simple use cases using the default logger
configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs/1.
See examples/file_to_pipe.exs for a working example.
"""
use Membrane.Sink

Expand All @@ -26,6 +28,19 @@ defmodule Membrane.File.Sink do

def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@spec redirect_logs() :: :ok
def redirect_logs() do
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
{:ok, config} = :logger.get_handler_config(:default)
:ok = :logger.remove_handler(:default)

:ok =
:logger.add_handler(
:default,
:logger_std_h,
put_in(config, [:config, :type], :standard_error)
)
end

@impl true
def handle_init(_ctx, %__MODULE__{location: :stdout}) do
{[],
Expand Down
7 changes: 0 additions & 7 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,12 @@ defmodule Membrane.File.Plugin.Mixfile do
]
end

def application do
[
extra_applications: []
]
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_env), do: ["lib"]

defp deps do
[
{:membrane_core, "~> 1.0"},
{:logger_backends, "~> 1.0"},
# Testing
{:mox, "~> 1.0", only: :test},
# Development
Expand Down
21 changes: 10 additions & 11 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
%{
"bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"},
"credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"},
Expand Down
44 changes: 23 additions & 21 deletions test/integration/stdio_test.exs
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ defmodule Membrane.File.Integration.StdioTest do
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
cat #{@input_text_file} | mix run #{@pipe_to_file} #{cmd_out} 2048' \
2> #{cmd_err}",
# MIX_ENV set explicitly to dev so that file_behaviour is not mocked
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
cat #{@input_text_file} | elixir #{@pipe_to_file} #{cmd_out} 2048'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand All @@ -48,11 +49,13 @@ defmodule Membrane.File.Integration.StdioTest do
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
cat #{@input_text_file} | mix run #{@pipe_to_file} #{cmd_out} 5' \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
cat #{@input_text_file} | elixir #{@pipe_to_file} #{cmd_out} 5'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand All @@ -67,23 +70,22 @@ defmodule Membrane.File.Integration.StdioTest do
test "pipeline from file to :stdout works",
%{cmd_err: cmd_err} = _context do
assert {"0123456789", _rc = 0} ==
System.shell("mix run #{@file_to_pipe} #{@input_text_file} \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
),
System.shell("elixir #{@file_to_pipe} #{@input_text_file} 2> #{cmd_err}"),
File.read!(cmd_err)
end

test "file to stdout to stdin to file works",
%{cmd_out: cmd_out, cmd_err: cmd_err} = _context do
{output, rc} =
System.shell(
"bash -c ' \
set -o pipefail; \
mix run #{@file_to_pipe} #{@input_text_file} \
| mix run #{@pipe_to_file} #{cmd_out} 2048' \
2> #{cmd_err}",
env: [{"MIX_QUIET", "true"}, {"MIX_ENV", "dev"}]
"""
bash -c '
set -o pipefail;
elixir #{@file_to_pipe} #{@input_text_file}
| elixir #{@pipe_to_file} #{cmd_out} 2048'
2> #{cmd_err}
"""
|> String.replace("\n", "")
)

Logger.debug("output when running script:")
Expand Down