diff --git a/examples/assets/browser_to_file/browser_to_file.js b/examples/assets/browser_to_file/browser_to_file.js deleted file mode 100644 index d5a1a7a..0000000 --- a/examples/assets/browser_to_file/browser_to_file.js +++ /dev/null @@ -1,59 +0,0 @@ -const pcConfig = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' },] }; -const mediaConstraints = { video: true, audio: true } - -const proto = window.location.protocol === "https:" ? "wss:" : "ws:" -const ws = new WebSocket(`${proto}//${window.location.hostname}:8829`); -const connStatus = document.getElementById("status"); -ws.onopen = _ => start_connection(ws); -ws.onclose = event => { - connStatus.innerHTML = "Disconnected" - console.log("WebSocket connection was terminated:", event); -} - -const start_connection = async (ws) => { - const localStream = await navigator.mediaDevices.getUserMedia(mediaConstraints); - const pc = new RTCPeerConnection(pcConfig); - - pc.onicecandidate = event => { - if (event.candidate === null) return; - console.log("Sent ICE candidate:", event.candidate); - ws.send(JSON.stringify({ type: "ice_candidate", data: event.candidate })); - }; - - pc.onconnectionstatechange = () => { - if (pc.connectionState == "connected") { - const button = document.createElement('button'); - button.innerHTML = "Disconnect"; - button.onclick = () => { - ws.close(); - localStream.getTracks().forEach(track => track.stop()) - } - connStatus.innerHTML = "Connected "; - connStatus.appendChild(button); - } - } - - for (const track of localStream.getTracks()) { - pc.addTrack(track, localStream); - } - - ws.onmessage = async event => { - const { type, data } = JSON.parse(event.data); - - switch (type) { - case "sdp_answer": - console.log("Received SDP answer:", data); - await pc.setRemoteDescription(data); - break; - case "ice_candidate": - console.log("Recieved ICE candidate:", data); - await pc.addIceCandidate(data); - break; - } - }; - - const offer = await pc.createOffer(); - await pc.setLocalDescription(offer); - console.log("Sent SDP offer:", offer) - ws.send(JSON.stringify({ type: "sdp_offer", data: offer })); -}; diff --git a/examples/assets/browser_to_file/index.html b/examples/assets/browser_to_file/index.html index 1b90fa4..562c647 100644 --- a/examples/assets/browser_to_file/index.html +++ b/examples/assets/browser_to_file/index.html @@ -2,19 +2,34 @@ - - - - Membrane WebRTC browser to file example + + + + Membrane WebRTC WHIP/WHEP Example -
-

Membrane WebRTC browser to file example

-
Connecting
-
- + style="background-color: black; color: white; font-family: Arial, Helvetica, sans-serif; min-height: 100vh; margin: 0px; padding: 5px 0px 5px 0px"> +

Membrane WebRTC WHIP/WHEP Example

+
Connecting...
+ \ No newline at end of file diff --git a/examples/assets/whip.html b/examples/assets/whip.html deleted file mode 100644 index 823d76e..0000000 --- a/examples/assets/whip.html +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - - - Elixir WebRTC WHIP/WHEP Example - - - -

Elixir WebRTC WHIP/WHEP Example

- - - - - - \ No newline at end of file diff --git a/examples/browser_to_file.exs b/examples/browser_to_file.exs index 789f8ed..64cfbbf 100644 --- a/examples/browser_to_file.exs +++ b/examples/browser_to_file.exs @@ -23,23 +23,16 @@ defmodule Example.Pipeline do @impl true def handle_init(_ctx, opts) do - # p = self() - - # Membrane.WebRTC.WhipServer.start_link( - # port: 8888, - # handle_new_client: fn signaling, _token -> - # send(p, {:ready, signaling}) - # :ok - # end - # ) - - # signaling = receive do: ({:ready, signaling} -> signaling) - spec = [ child(:webrtc, %WebRTC.Source{ - # signaling: {:whip, port: opts[:port], ip: :any, serve_static: "examples/assets"} - signaling: opts[:port] + signaling: { + :whip, + token: "whip_it!", + port: opts[:port], + ip: :any, + serve_static: "#{__DIR__}/assets/browser_to_file" + } }), child(:matroska, Membrane.Matroska.Muxer), get_child(:webrtc) @@ -67,43 +60,15 @@ defmodule Example.Pipeline do end end -defmodule Router do - use Plug.Router - - plug(Plug.Logger) - plug(Plug.Static, at: "/", from: "examples/assets") - plug(:match) - plug(:dispatch) - - forward( - "/", - to: Membrane.WebRTC.WhipServer.Router, - handle_new_client: &__MODULE__.handle_new_client/1 - ) - - def handle_new_client(_token) do - signaling = Membrane.WebRTC.SignalingChannel.new() +port = 8829 +{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Example.Pipeline, port: port) +Process.monitor(supervisor) - {:ok, _supervisor, _pipeline} = - Membrane.Pipeline.start_link(Example.Pipeline, port: signaling) +Logger.info(""" +Visit http://localhost:#{port}/static/index.html to start the stream. To finish the recording properly, +don't terminate this script - instead click 'disconnect' in the website or close the browser tab. +""") - {:ok, signaling} - end +receive do + {:DOWN, _ref, :process, ^supervisor, _reason} -> :ok end - -Bandit.start_link(plug: Router, ip: :any, port: 8829) - -Process.sleep(:infinity) - -# port = 8829 -# {:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Example.Pipeline, port: port) -# Process.monitor(supervisor) - -# Logger.info(""" -# Visit http://localhost:#{port}/index.html to start the stream. To finish the recording properly, -# don't terminate this script - instead click 'disconnect' in the website or close the browser tab. -# """) - -# receive do -# {:DOWN, _ref, :process, ^supervisor, _reason} -> :ok -# end diff --git a/lib/membrane_webrtc/ex_webrtc/source.ex b/lib/membrane_webrtc/ex_webrtc/source.ex index 16884fe..d29904c 100644 --- a/lib/membrane_webrtc/ex_webrtc/source.ex +++ b/lib/membrane_webrtc/ex_webrtc/source.ex @@ -366,10 +366,15 @@ defmodule Membrane.WebRTC.ExWebRTCSource do defp setup_whip(ctx, opts) do signaling = SignalingChannel.new() clients_cnt = :atomics.new(1, []) - - handle_new_client = fn _token -> - clients_cnt = :atomics.add_get(clients_cnt, 1, 1) - if clients_cnt == 1, do: {:ok, signaling}, else: {:error, :already_connected} + {token, opts} = Keyword.pop(opts, :token, fn _token -> true end) + validate_token = if is_function(token), do: token, else: &(&1 == token) + + handle_new_client = fn token -> + cond do + !validate_token.(token) -> {:error, :invalid_token} + :atomics.add_get(clients_cnt, 1, 1) > 1 -> {:error, :already_connected} + true -> {:ok, signaling} + end end Membrane.UtilitySupervisor.start_child(ctx.utility_supervisor, { diff --git a/lib/membrane_webrtc/sink.ex b/lib/membrane_webrtc/sink.ex index 84c535e..8acb331 100644 --- a/lib/membrane_webrtc/sink.ex +++ b/lib/membrane_webrtc/sink.ex @@ -33,12 +33,24 @@ defmodule Membrane.WebRTC.Sink do """ @type new_tracks :: {:new_tracks, [%{id: term, kind: :audio | :video}]} + @typedoc """ + WHIP client options + + - `uri` - Address of the WHIP server (HTTP/HTTPS) + - `token` - WHIP token, defaults to an empty string + """ + @type whip_options :: {:uri, String.t()} | {:token, String.t()} + def_options signaling: [ - spec: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()}, + spec: + SignalingChannel.t() + | {:whip, whip_options} + | {:websocket, SimpleWebSocketServer.options()}, description: """ Channel for passing WebRTC signaling messages (SDP and ICE). Either: - `#{inspect(SignalingChannel)}` - See its docs for details. + - `{:whip, options}` - Acts as a WHIP client, see `t:whip_options/0` for details. - `{:websocket, options}` - Spawns #{inspect(SimpleWebSocketServer)}, see there for details. """ diff --git a/lib/membrane_webrtc/source.ex b/lib/membrane_webrtc/source.ex index 1ceeab3..2d531ee 100644 --- a/lib/membrane_webrtc/source.ex +++ b/lib/membrane_webrtc/source.ex @@ -26,12 +26,32 @@ defmodule Membrane.WebRTC.Source do """ @type new_tracks :: {:new_tracks, [%{id: term, kind: :audio | :video}]} + @typedoc """ + Options for WHIP server input. + + The server accepts a single connection and the stream is received by this source. The options are: + + - `token` - either expected WHIP token or a function returning true if the token is valid, otherwise false + - `serve_static` - make WHIP server also serve static content, such as an HTML page under `/static` endpoint + - Any of `t:Bandit.options/0` - in particular `ip` and `port` + + To handle multiple connections and have more control over the server, see `Membrane.WebRTC.WhipServer`. + """ + @type whip_options :: + {:token, String.t() | (String.t() -> boolean())} + | {:serve_static, String.t()} + | {atom, term()} + def_options signaling: [ - spec: SignalingChannel.t() | {:websocket, SimpleWebSocketServer.options()}, + spec: + SignalingChannel.t() + | {:whip, whip_options()} + | {:websocket, SimpleWebSocketServer.options()}, description: """ Channel for passing WebRTC signaling messages (SDP and ICE). Either: - `#{inspect(SignalingChannel)}` - See its docs for details. + - `{:whip, options}` - Starts a WHIP server, see `t:whip_options/0` for details. - `{:websocket, options}` - Spawns #{inspect(SimpleWebSocketServer)}, see there for details. """ diff --git a/lib/membrane_webrtc/whip_server.ex b/lib/membrane_webrtc/whip_server.ex index cd99cb1..3f0c38b 100644 --- a/lib/membrane_webrtc/whip_server.ex +++ b/lib/membrane_webrtc/whip_server.ex @@ -8,8 +8,8 @@ defmodule Membrane.WebRTC.WhipServer do the signaling channel to negotiate the connection or error to reject it. The signaling channel can be passed to `Membrane.WebRTC.Source`. - `serve_static` - path to static assets that should be served along with WHIP, - useful to serve HTML assets. If set to `false` (default), no static assets are - served + under a `/static` endpoint. Useful to serve HTML assets. If set to `false` (default), + no static assets are served - Any of `t:Bandit.options/0` - Bandit configuration """ @@ -72,7 +72,7 @@ defmodule Membrane.WebRTC.WhipServer do """ use Plug.Router - plug(Plug.Logger, log: :info) + plug(Plug.Logger, log: :debug) plug(Corsica, origins: "*") plug(:match) plug(:dispatch)