Skip to content

Commit

Permalink
Allow for sending data (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala authored Nov 28, 2023
1 parent 16043e7 commit 363500a
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 135 deletions.
65 changes: 38 additions & 27 deletions examples/example.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ defmodule Peer do

require Logger

alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription}
alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription, RTPTransceiver}

@ice_servers [
# %{urls: "stun:stun.stunprotocol.org:3478"},
%{urls: "stun:stun.l.google.com:19302"}
]

def start_link(mode \\ :passive) do
GenServer.start_link(__MODULE__, mode)
def start_link() do
GenServer.start_link(__MODULE__, nil)
end

@impl true
def init(mode) do
def init(_) do
{:ok, conn} = :gun.open({127, 0, 0, 1}, 4000)
{:ok, _protocol} = :gun.await_up(conn)
:gun.ws_upgrade(conn, "/websocket")
Expand All @@ -32,16 +32,7 @@ defmodule Peer do

{:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers)

if mode == :active do
{:ok, _transceiver} = PeerConnection.add_transceiver(pc, :audio)
{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
msg = %{"type" => "offer", "sdp" => offer.sdp}
:gun.ws_send(conn, stream, {:text, Jason.encode!(msg)})
Logger.info("Send SDP offer: #{offer.sdp}")
end

{:ok, %{conn: conn, stream: stream, peer_connection: pc, mode: mode}}
{:ok, %{conn: conn, stream: stream, peer_connection: pc, track_id: nil}}

other ->
Logger.error("Couldn't connect to the signalling server: #{inspect(other)}")
Expand All @@ -64,9 +55,10 @@ defmodule Peer do

@impl true
def handle_info({:gun_ws, _, _, {:text, msg}}, state) do
msg
|> Jason.decode!()
|> handle_ws_message(state)
state =
msg
|> Jason.decode!()
|> handle_ws_message(state)

{:noreply, state}
end
Expand All @@ -79,9 +71,7 @@ defmodule Peer do

@impl true
def handle_info({:ex_webrtc, _pid, msg}, state) do
Logger.info("Received ExWebRTC message: #{inspect(msg)}")
handle_webrtc_message(msg, state)

{:noreply, state}
end

Expand All @@ -91,20 +81,30 @@ defmodule Peer do
{:noreply, state}
end

defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{mode: :passive} = state) do
defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{peer_connection: pc} = state) do
Logger.info("Received SDP offer: #{inspect(sdp)}")
offer = %SessionDescription{type: :offer, sdp: sdp}
:ok = PeerConnection.set_remote_description(state.peer_connection, offer)
{:ok, answer} = PeerConnection.create_answer(state.peer_connection)
:ok = PeerConnection.set_local_description(state.peer_connection, answer)
:ok = PeerConnection.set_remote_description(pc, offer)
{:ok, answer} = PeerConnection.create_answer(pc)
:ok = PeerConnection.set_local_description(pc, answer)
msg = %{"type" => "answer", "sdp" => answer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})

track = ExWebRTC.MediaStreamTrack.new(:video)
{:ok, _} = PeerConnection.add_transceiver(pc, track)
{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
msg = %{"type" => "offer", "sdp" => offer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})

%{state | track_id: track.id}
end

defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, %{mode: :active} = state) do
defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do
Logger.info("Received SDP answer: #{inspect(sdp)}")
answer = %SessionDescription{type: :answer, sdp: sdp}
:ok = PeerConnection.set_remote_description(state.peer_connection, answer)
state
end

defp handle_ws_message(%{"type" => "ice", "data" => data}, state) do
Expand All @@ -118,10 +118,13 @@ defmodule Peer do
}

:ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate)

state
end

defp handle_ws_message(msg, _state) do
defp handle_ws_message(msg, state) do
Logger.info("Received unexpected message: #{inspect(msg)}")
state
end

defp handle_webrtc_message({:ice_candidate, candidate}, state) do
Expand All @@ -136,13 +139,21 @@ defmodule Peer do
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})
end

defp handle_webrtc_message({:rtp, _mid, _packet}, %{track_id: nil}) do
Logger.warning("Received RTP, but out transceiver has not beed created")
end

defp handle_webrtc_message({:rtp, _mid, packet}, state) do
Logger.info("Received RTP: #{inspect(packet)}")
PeerConnection.send_rtp(state.peer_connection, state.track_id, packet)
end

defp handle_webrtc_message(msg, _state) do
Logger.warning("Received unknown ex_webrtc message: #{inspect(msg)}")
end
end

mode = :active
{:ok, pid} = Peer.start_link(mode)
{:ok, pid} = Peer.start_link()
ref = Process.monitor(pid)

receive do
Expand Down
23 changes: 13 additions & 10 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ const start_connection = async (ws) => {
pc.oniceconnectionstatechange = _ => console.log("ICE connection state changed:", pc.iceConnectionState);
pc.onicegatheringstatechange = _ => console.log("ICE gathering state changed:", pc.iceGatheringState);
pc.onsignalingstatechange = _ => console.log("Signaling state changed:", pc.signalingState);
pc.ontrack = event => console.log("New track:", event);
pc.ontrack = event => {
const videoPlayer = document.createElement("video");
videoPlayer.srcObject = event.streams[0];
videoPlayer.onloadedmetadata = () => {
videoPlayer.play();
};
document.body.appendChild(videoPlayer);
};
pc.onicecandidate = event => {
console.log("New local ICE candidate:", event.candidate);

Expand All @@ -22,7 +29,7 @@ const start_connection = async (ws) => {
}
};

const localStream = await navigator.mediaDevices.getUserMedia({audio: true});
const localStream = await navigator.mediaDevices.getUserMedia({video: true});
for (const track of localStream.getTracks()) {
pc.addTrack(track, localStream);
}
Expand All @@ -48,16 +55,12 @@ const start_connection = async (ws) => {
}
};

if (mode === "active") {
const desc = await pc.createOffer();
console.log("Generated SDP offer:", desc);
await pc.setLocalDescription(desc);
ws.send(JSON.stringify(desc))
}
const desc = await pc.createOffer();
console.log("Generated SDP offer:", desc);
await pc.setLocalDescription(desc);
ws.send(JSON.stringify(desc))
};

const mode = "passive"

const ws = new WebSocket("ws://127.0.0.1:4000/websocket");
ws.onclose = event => console.log("WebSocket was closed", event);
ws.onopen = _ => start_connection(ws);
75 changes: 52 additions & 23 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ defmodule ExWebRTC.DTLSTransport do
end

@doc false
@spec send_data(dtls_transport(), binary()) :: :ok
def send_data(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_data, data})
@spec send_rtp(dtls_transport(), binary()) :: :ok
def send_rtp(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_rtp, data})
end

@doc false
@spec send_rtcp(dtls_transport(), binary()) :: :ok
def send_rtcp(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_rtcp, data})
end

@impl true
Expand All @@ -67,7 +73,6 @@ defmodule ExWebRTC.DTLSTransport do
fingerprint = ExDTLS.get_cert_fingerprint(cert)

{:ok, ice_agent} = ice_module.start_link(:controlled, ice_config)
srtp = ExLibSRTP.new()

state = %{
owner: owner,
Expand All @@ -77,9 +82,10 @@ defmodule ExWebRTC.DTLSTransport do
cert: cert,
pkey: pkey,
fingerprint: fingerprint,
in_srtp: ExLibSRTP.new(),
out_srtp: ExLibSRTP.new(),
# sha256 hex dump
peer_fingerprint: nil,
srtp: srtp,
dtls_state: :new,
dtls: nil,
mode: nil
Expand Down Expand Up @@ -125,18 +131,25 @@ defmodule ExWebRTC.DTLSTransport do
end

@impl true
def handle_cast({:send_data, _data}, %{dtls_state: :connected, ice_state: ice_state} = state)
def handle_cast({:send_rtp, data}, %{dtls_state: :connected, ice_state: ice_state} = state)
when ice_state in [:connected, :completed] do
# TODO
case ExLibSRTP.protect(state.out_srtp, data) do
{:ok, protected} -> ICEAgent.send_data(state.ice_agent, protected)
{:error, reason} -> Logger.error("Unable to protect RTP: #{inspect(reason)}")
end

{:noreply, state}
end

@impl true
def handle_cast({:send_data, _data}, state) do
Logger.error(
"Attempted to send data when DTLS handshake was not finished or ICE Transport is unavailable"
)
def handle_cast({:send_rtp, _data}, state) do
Logger.warning("Attemped to send RTP before DTLS handshake has been finished. Ignoring.")
{:noreply, state}
end

@impl true
def handle_cast({:send_rtcp, _data}, state) do
# TODO: implement
{:noreply, state}
end

Expand Down Expand Up @@ -183,6 +196,7 @@ defmodule ExWebRTC.DTLSTransport do
end

defp handle_ice({:data, <<f, _rest::binary>> = data}, state) when f in 20..64 do
# TODO: handle {:connection_closed, _}
case ExDTLS.handle_data(state.dtls, data) do
{:handshake_packets, packets, timeout} when state.ice_state in [:connected, :completed] ->
:ok = ICEAgent.send_data(state.ice_agent, packets)
Expand All @@ -199,7 +213,7 @@ defmodule ExWebRTC.DTLSTransport do
state = %{state | buffered_packets: packets}
update_dtls_state(state, :connecting)

{:handshake_finished, _, remote_keying_material, profile, packets} ->
{:handshake_finished, lkm, rkm, profile, packets} ->
Logger.debug("DTLS handshake finished")
ICEAgent.send_data(state.ice_agent, packets)

Expand All @@ -210,16 +224,16 @@ defmodule ExWebRTC.DTLSTransport do
|> Utils.hex_dump()

if peer_fingerprint == state.peer_fingerprint do
state = setup_srtp(state, remote_keying_material, profile)
:ok = setup_srtp(state, lkm, rkm, profile)
update_dtls_state(state, :connected)
else
Logger.debug("Non-matching peer cert fingerprint.")
update_dtls_state(state, :failed)
end

{:handshake_finished, _, remote_keying_material, profile} ->
{:handshake_finished, lkm, rkm, profile} ->
Logger.debug("DTLS handshake finished")
state = setup_srtp(state, remote_keying_material, profile)
:ok = setup_srtp(state, lkm, rkm, profile)
update_dtls_state(state, :connected)

:handshake_want_read ->
Expand All @@ -229,13 +243,18 @@ defmodule ExWebRTC.DTLSTransport do

defp handle_ice({:data, <<f, _rest::binary>> = data}, %{dtls_state: :connected} = state)
when f in 128..191 do
case ExLibSRTP.unprotect(state.srtp, data) do
{type, unprotect} =
case data do
<<_, s, _::binary>> when s in 192..223 -> {:rtcp, &ExLibSRTP.unprotect_rtcp/2}
_ -> {:rtp, &ExLibSRTP.unprotect/2}
end

case unprotect.(state.in_srtp, data) do
{:ok, payload} ->
# TODO: temporarily, everything goes to peer connection process
notify(state.owner, {:rtp_data, payload})
notify(state.owner, {type, payload})

{:error, reason} ->
Logger.warning("Failed to decrypt SRTP, reason: #{inspect(reason)}")
Logger.error("Failed to decrypt SRTP/SRTCP, reason: #{inspect(reason)}")
end

state
Expand Down Expand Up @@ -280,19 +299,29 @@ defmodule ExWebRTC.DTLSTransport do

defp handle_ice(_msg, state), do: state

defp setup_srtp(state, remote_keying_material, profile) do
defp setup_srtp(state, local_keying_material, remote_keying_material, profile) do
{:ok, crypto_profile} =
ExLibSRTP.Policy.crypto_profile_from_dtls_srtp_protection_profile(profile)

policy = %ExLibSRTP.Policy{
inbound_policy = %ExLibSRTP.Policy{
ssrc: :any_inbound,
key: remote_keying_material,
rtp: crypto_profile,
rtcp: crypto_profile
}

:ok = ExLibSRTP.add_stream(state.srtp, policy)
state
:ok = ExLibSRTP.add_stream(state.in_srtp, inbound_policy)

outbound_policy = %ExLibSRTP.Policy{
ssrc: :any_outbound,
key: local_keying_material,
rtp: crypto_profile,
rtcp: crypto_profile
}

:ok = ExLibSRTP.add_stream(state.out_srtp, outbound_policy)

:ok
end

defp update_dtls_state(%{dtls_state: dtls_state} = state, dtls_state), do: state
Expand Down
9 changes: 4 additions & 5 deletions lib/ex_webrtc/media_stream_track.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ defmodule ExWebRTC.MediaStreamTrack do

@type t() :: %__MODULE__{
kind: :audio | :video,
id: integer(),
mid: String.t()
id: integer()
}

@enforce_keys [:id, :kind]
defstruct @enforce_keys ++ [:mid]
defstruct @enforce_keys

def from_transceiver(tr) do
%__MODULE__{kind: tr.kind, id: generate_id(), mid: tr.mid}
def new(kind) do
%__MODULE__{kind: kind, id: generate_id()}
end

defp generate_id() do
Expand Down
Loading

0 comments on commit 363500a

Please sign in to comment.