diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 505e65e..2cd1a20 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -12,8 +12,6 @@ jobs: strategy: matrix: include: - - elixir: 1.10.x - otp: 22.x - elixir: 1.12.x otp: 23.x - elixir: 1.14.x diff --git a/README.md b/README.md index 9fc135b..2862892 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,45 @@ [![Hex.pm](https://img.shields.io/hexpm/v/exile.svg)](https://hex.pm/packages/exile) [![docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/exile/) -Exile is an alternative to [ports](https://hexdocs.pm/elixir/Port.html) for running external programs. It provides back-pressure, non-blocking io, and tries to fix ports issues. - -Exile is built around the idea of having demand-driven, asynchronous -interaction with external process. Think of streaming a video through -`ffmpeg` to serve a web request. Exile internally uses NIF. See -[Rationale](#rationale) for details. It also provides stream -abstraction for interacting with an external program. For example, -getting audio out of a stream is as simple as +Exile is an alternative to +[ports](https://hexdocs.pm/elixir/Port.html) for running external +programs. It let you stream input and output to external program with +back-pressure, non-blocking IO. Also, it fixes other port related +issues such as selectively closing stdin. + +### Port IO Issue + +With [Port](https://hexdocs.pm/elixir/Port.html) if you run external +program which generates lot of output to stdout. Something like +streaming video using `ffmpeg` to serve a web request. If you try +this with port then quickly going to run out of memory. Because port +IO is not not demand driven. It consumes output from stdout as soon as +it is available and `send` it to process mailbox. And as you know beam +process mailbox is unbounded, so output sits there waiting to be `receive`d. + +#### Lets take an example. + +Memory consumption with Port + +```elixir +Port.open({:spawn_executable, "/bin/cat"}, [{:args, ["/dev/random"]}, {:line, 10}, :binary, :use_stdio]) +``` + +![Port memory consumption](./images/port.png) + +#### Memory consumption with Exile + +```elixir +Exile.stream!(~w(cat /dev/random)) +|> Enum.each(fn data -> + IO.puts(IO.iodata_length(data)) +end) +``` + +![Exile memory consumption](./images/exile.png) + +Exile achieves this by implementing demand-driven, asynchronous IO mechanism with external process using NIF. +See [Rationale](#rationale) for details. For example, getting audio out of a stream is as simple as ``` elixir Exile.stream!(~w(ffmpeg -i pipe:0 -f mp3 pipe:1), input: File.stream!("music_video.mkv", [], 65_535)) diff --git a/c_src/exile.c b/c_src/exile.c index 5e1645c..fd24956 100644 --- a/c_src/exile.c +++ b/c_src/exile.c @@ -196,7 +196,7 @@ static ERL_NIF_TERM nif_create_fd(ErlNifEnv *env, int argc, return make_ok(env, term); - error_exit: +error_exit: enif_release_resource(fd); return ATOM_ERROR; } diff --git a/c_src/spawner.c b/c_src/spawner.c index 0c3935d..6bb8e3e 100644 --- a/c_src/spawner.c +++ b/c_src/spawner.c @@ -147,7 +147,6 @@ static int exec_process(char const *bin, char *const *args, int socket, } if (strcmp(stderr_str, "consume") == 0) { - debug("== %d", strcmp(stderr_str, "consume")); close(STDERR_FILENO); close(r_cmderr); if (dup2(w_cmderr, STDERR_FILENO) < 0) { diff --git a/images/exile.png b/images/exile.png new file mode 100644 index 0000000..4d0cc4a Binary files /dev/null and b/images/exile.png differ diff --git a/images/port.png b/images/port.png new file mode 100644 index 0000000..dd2e852 Binary files /dev/null and b/images/port.png differ diff --git a/lib/exile/process/exec.ex b/lib/exile/process/exec.ex index d5f0ceb..42ec470 100644 --- a/lib/exile/process/exec.ex +++ b/lib/exile/process/exec.ex @@ -3,6 +3,7 @@ defmodule Exile.Process.Exec do alias Exile.Process.Nif alias Exile.Process.Pipe + alias Exile.Process.State @type args :: %{ cmd_with_args: [String.t()], @@ -10,20 +11,14 @@ defmodule Exile.Process.Exec do env: [{String.t(), String.t()}] } - @spec start(args, boolean()) :: %{ + @spec start(args, State.stderr_mode()) :: %{ port: port, stdin: non_neg_integer(), stdout: non_neg_integer(), stderr: non_neg_integer() } - def start( - %{ - cmd_with_args: cmd_with_args, - cd: cd, - env: env - }, - stderr - ) do + def start(args, stderr) do + %{cmd_with_args: cmd_with_args, cd: cd, env: env} = args socket_path = socket_path() {:ok, sock} = :socket.open(:local, :stream, :default) @@ -78,8 +73,8 @@ defmodule Exile.Process.Exec do @socket_timeout 2000 - @spec receive_fds(:socket.socket(), boolean) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()} - defp receive_fds(lsock, stderr) do + @spec receive_fds(:socket.socket(), State.stderr_mode()) :: {Pipe.fd(), Pipe.fd(), Pipe.fd()} + defp receive_fds(lsock, stderr_mode) do {:ok, sock} = :socket.accept(lsock, @socket_timeout) try do @@ -91,12 +86,16 @@ defmodule Exile.Process.Exec do # FDs are managed by the NIF resource life-cycle {:ok, stdout} = Nif.nif_create_fd(stdout_fd) {:ok, stdin} = Nif.nif_create_fd(stdin_fd) + {:ok, stderr} = Nif.nif_create_fd(stderr_fd) - {:ok, stderr} = - if stderr == :consume do - Nif.nif_create_fd(stderr_fd) + stderr = + if stderr_mode == :consume do + stderr else - {:ok, nil} + # we have to explicitly close FD passed over socket. + # Since it will be tracked by the OS and kept open until we close. + Nif.nif_close(stderr) + nil end {stdin, stdout, stderr} diff --git a/lib/exile/process/state.ex b/lib/exile/process/state.ex index 678c07f..60f637f 100644 --- a/lib/exile/process/state.ex +++ b/lib/exile/process/state.ex @@ -9,6 +9,8 @@ defmodule Exile.Process.State do @type read_mode :: :stdout | :stderr | :stdout_or_stderr + @type stderr_mode :: :console | :disable | :consume + @type pipes :: %{ stdin: Pipe.t(), stdout: Pipe.t(), @@ -27,7 +29,7 @@ defmodule Exile.Process.State do port: port(), pipes: pipes, status: status, - stderr: :console | :disable | :consume, + stderr: stderr_mode, operations: Operations.t(), exit_ref: reference(), monitor_ref: reference() diff --git a/test/exile/sync_process_test.exs b/test/exile/sync_process_test.exs index 164a69c..de7d441 100644 --- a/test/exile/sync_process_test.exs +++ b/test/exile/sync_process_test.exs @@ -45,6 +45,33 @@ defmodule Exile.SyncProcessTest do assert %{active: 0, workers: 0} = DynamicSupervisor.count_children(Exile.WatcherSupervisor) end + test "FDs are not leaked" do + before_count = opened_pipes() + + for _ <- 1..100 do + {:ok, s} = Process.start_link(~w(date)) + :ok = Process.close_stdin(s) + assert {:ok, _} = Process.read_any(s, 100) + assert :eof = Process.read_any(s, 100) + assert {:ok, 0} = Process.await_exit(s, 100) + end + + # let the dust settle + :timer.sleep(2000) + + after_count = opened_pipes() + + assert before_count == after_count + end + + defp opened_pipes do + {pipe_count, 0} = System.shell(~s(lsof -a -p #{:os.getpid()} | grep " PIPE " | wc -l)) + + pipe_count + |> String.trim() + |> String.to_integer() + end + defp stop_all_children(sup) do DynamicSupervisor.which_children(sup) |> Enum.each(fn {_, pid, _, _} ->