Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for sending data #21

Merged
merged 9 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 38 additions & 27 deletions examples/example.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ defmodule Peer do

require Logger

alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription}
alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription, RTPTransceiver}

@ice_servers [
# %{urls: "stun:stun.stunprotocol.org:3478"},
%{urls: "stun:stun.l.google.com:19302"}
]

def start_link(mode \\ :passive) do
GenServer.start_link(__MODULE__, mode)
def start_link() do
GenServer.start_link(__MODULE__, nil)
end

@impl true
def init(mode) do
def init(_) do
{:ok, conn} = :gun.open({127, 0, 0, 1}, 4000)
{:ok, _protocol} = :gun.await_up(conn)
:gun.ws_upgrade(conn, "/websocket")
Expand All @@ -32,16 +32,7 @@ defmodule Peer do

{:ok, pc} = PeerConnection.start_link(ice_servers: @ice_servers)

if mode == :active do
{:ok, _transceiver} = PeerConnection.add_transceiver(pc, :audio)
{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
msg = %{"type" => "offer", "sdp" => offer.sdp}
:gun.ws_send(conn, stream, {:text, Jason.encode!(msg)})
Logger.info("Send SDP offer: #{offer.sdp}")
end

{:ok, %{conn: conn, stream: stream, peer_connection: pc, mode: mode}}
{:ok, %{conn: conn, stream: stream, peer_connection: pc, track_id: nil}}

other ->
Logger.error("Couldn't connect to the signalling server: #{inspect(other)}")
Expand All @@ -64,9 +55,10 @@ defmodule Peer do

@impl true
def handle_info({:gun_ws, _, _, {:text, msg}}, state) do
msg
|> Jason.decode!()
|> handle_ws_message(state)
state =
msg
|> Jason.decode!()
|> handle_ws_message(state)

{:noreply, state}
end
Expand All @@ -79,9 +71,7 @@ defmodule Peer do

@impl true
def handle_info({:ex_webrtc, _pid, msg}, state) do
Logger.info("Received ExWebRTC message: #{inspect(msg)}")
handle_webrtc_message(msg, state)

{:noreply, state}
end

Expand All @@ -91,20 +81,30 @@ defmodule Peer do
{:noreply, state}
end

defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{mode: :passive} = state) do
defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, %{peer_connection: pc} = state) do
Logger.info("Received SDP offer: #{inspect(sdp)}")
offer = %SessionDescription{type: :offer, sdp: sdp}
:ok = PeerConnection.set_remote_description(state.peer_connection, offer)
{:ok, answer} = PeerConnection.create_answer(state.peer_connection)
:ok = PeerConnection.set_local_description(state.peer_connection, answer)
:ok = PeerConnection.set_remote_description(pc, offer)
{:ok, answer} = PeerConnection.create_answer(pc)
:ok = PeerConnection.set_local_description(pc, answer)
msg = %{"type" => "answer", "sdp" => answer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})

track = ExWebRTC.MediaStreamTrack.new(:video)
{:ok, _} = PeerConnection.add_transceiver(pc, track)
{:ok, offer} = PeerConnection.create_offer(pc)
:ok = PeerConnection.set_local_description(pc, offer)
msg = %{"type" => "offer", "sdp" => offer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})

%{state | track_id: track.id}
end

defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, %{mode: :active} = state) do
defp handle_ws_message(%{"type" => "answer", "sdp" => sdp}, state) do
Logger.info("Received SDP answer: #{inspect(sdp)}")
answer = %SessionDescription{type: :answer, sdp: sdp}
:ok = PeerConnection.set_remote_description(state.peer_connection, answer)
state
end

defp handle_ws_message(%{"type" => "ice", "data" => data}, state) do
Expand All @@ -118,10 +118,13 @@ defmodule Peer do
}

:ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate)

state
end

defp handle_ws_message(msg, _state) do
defp handle_ws_message(msg, state) do
Logger.info("Received unexpected message: #{inspect(msg)}")
state
end

defp handle_webrtc_message({:ice_candidate, candidate}, state) do
Expand All @@ -136,13 +139,21 @@ defmodule Peer do
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})
end

defp handle_webrtc_message({:rtp, _mid, _packet}, %{track_id: nil}) do
Logger.warning("Received RTP, but out transceiver has not beed created")
end

defp handle_webrtc_message({:rtp, _mid, packet}, state) do
Logger.info("Received RTP: #{inspect(packet)}")
PeerConnection.send_rtp(state.peer_connection, state.track_id, packet)
end

defp handle_webrtc_message(msg, _state) do
Logger.warning("Received unknown ex_webrtc message: #{inspect(msg)}")
end
end

mode = :active
{:ok, pid} = Peer.start_link(mode)
{:ok, pid} = Peer.start_link()
ref = Process.monitor(pid)

receive do
Expand Down
23 changes: 13 additions & 10 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ const start_connection = async (ws) => {
pc.oniceconnectionstatechange = _ => console.log("ICE connection state changed:", pc.iceConnectionState);
pc.onicegatheringstatechange = _ => console.log("ICE gathering state changed:", pc.iceGatheringState);
pc.onsignalingstatechange = _ => console.log("Signaling state changed:", pc.signalingState);
pc.ontrack = event => console.log("New track:", event);
pc.ontrack = event => {
const videoPlayer = document.createElement("video");
videoPlayer.srcObject = event.streams[0];
videoPlayer.onloadedmetadata = () => {
videoPlayer.play();
};
document.body.appendChild(videoPlayer);
};
pc.onicecandidate = event => {
console.log("New local ICE candidate:", event.candidate);

Expand All @@ -22,7 +29,7 @@ const start_connection = async (ws) => {
}
};

const localStream = await navigator.mediaDevices.getUserMedia({audio: true});
const localStream = await navigator.mediaDevices.getUserMedia({video: true});
for (const track of localStream.getTracks()) {
pc.addTrack(track, localStream);
}
Expand All @@ -48,16 +55,12 @@ const start_connection = async (ws) => {
}
};

if (mode === "active") {
const desc = await pc.createOffer();
console.log("Generated SDP offer:", desc);
await pc.setLocalDescription(desc);
ws.send(JSON.stringify(desc))
}
const desc = await pc.createOffer();
console.log("Generated SDP offer:", desc);
await pc.setLocalDescription(desc);
ws.send(JSON.stringify(desc))
};

const mode = "passive"

const ws = new WebSocket("ws://127.0.0.1:4000/websocket");
ws.onclose = event => console.log("WebSocket was closed", event);
ws.onopen = _ => start_connection(ws);
73 changes: 51 additions & 22 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,15 @@ defmodule ExWebRTC.DTLSTransport do
end

@doc false
@spec send_data(dtls_transport(), binary()) :: :ok
def send_data(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_data, data})
@spec send_rtp(dtls_transport(), binary()) :: :ok
def send_rtp(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_rtp, data})
end

@doc false
@spec send_rtcp(dtls_transport(), binary()) :: :ok
def send_rtcp(dtls_transport, data) do
GenServer.cast(dtls_transport, {:send_rtcp, data})
end

@impl true
Expand All @@ -68,7 +74,6 @@ defmodule ExWebRTC.DTLSTransport do
fingerprint = ExDTLS.get_cert_fingerprint(dtls)

{:ok, ice_agent} = ice_module.start_link(:controlled, ice_config)
srtp = ExLibSRTP.new()

state = %{
owner: owner,
Expand All @@ -78,7 +83,8 @@ defmodule ExWebRTC.DTLSTransport do
cert: cert,
pkey: pkey,
fingerprint: fingerprint,
srtp: srtp,
in_srtp: ExLibSRTP.new(),
out_srtp: ExLibSRTP.new(),
dtls_state: :new,
dtls: nil,
mode: nil
Expand Down Expand Up @@ -121,18 +127,25 @@ defmodule ExWebRTC.DTLSTransport do
end

@impl true
def handle_cast({:send_data, _data}, %{dtls_state: :connected, ice_state: ice_state} = state)
def handle_cast({:send_rtp, data}, %{dtls_state: :connected, ice_state: ice_state} = state)
when ice_state in [:connected, :completed] do
# TODO
case ExLibSRTP.protect(state.out_srtp, data) do
{:ok, protected} -> ICEAgent.send_data(state.ice_agent, protected)
{:error, reason} -> Logger.error("Unable to protect RTP: #{inspect(reason)}")
end

{:noreply, state}
end

@impl true
def handle_cast({:send_data, _data}, state) do
Logger.error(
"Attempted to send data when DTLS handshake was not finished or ICE Transport is unavailable"
)
def handle_cast({:send_rtp, _data}, state) do
Logger.warning("Attemped to send RTP before DTLS handshake has been finished")
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
{:noreply, state}
end

@impl true
def handle_cast({:send_rtcp, _data}, state) do
# TODO: implement
{:noreply, state}
end

Expand Down Expand Up @@ -179,6 +192,7 @@ defmodule ExWebRTC.DTLSTransport do
end

defp handle_ice({:data, <<f, _rest::binary>> = data}, state) when f in 20..64 do
# TODO: handle {:connection_closed, _}
case ExDTLS.handle_data(state.dtls, data) do
{:handshake_packets, packets, timeout} when state.ice_state in [:connected, :completed] ->
:ok = ICEAgent.send_data(state.ice_agent, packets)
Expand All @@ -195,16 +209,16 @@ defmodule ExWebRTC.DTLSTransport do
state = %{state | buffered_packets: packets}
update_dtls_state(state, :connecting)

{:handshake_finished, _, remote_keying_material, profile, packets} ->
{:handshake_finished, lkm, rkm, profile, packets} ->
Logger.debug("DTLS handshake finished")
ICEAgent.send_data(state.ice_agent, packets)
# TODO: validate fingerprint
state = setup_srtp(state, remote_keying_material, profile)
state = setup_srtp(state, lkm, rkm, profile)
mickel8 marked this conversation as resolved.
Show resolved Hide resolved
update_dtls_state(state, :connected)

{:handshake_finished, _, remote_keying_material, profile} ->
{:handshake_finished, lkm, rkm, profile} ->
Logger.debug("DTLS handshake finished")
state = setup_srtp(state, remote_keying_material, profile)
state = setup_srtp(state, lkm, rkm, profile)
update_dtls_state(state, :connected)

:handshake_want_read ->
Expand All @@ -214,13 +228,18 @@ defmodule ExWebRTC.DTLSTransport do

defp handle_ice({:data, <<f, _rest::binary>> = data}, %{dtls_state: :connected} = state)
when f in 128..191 do
case ExLibSRTP.unprotect(state.srtp, data) do
{type, unprotect} =
case data do
<<_, s, _::binary>> when s in 192..223 -> {:rtcp, &ExLibSRTP.unprotect_rtcp/2}
_ -> {:rtp, &ExLibSRTP.unprotect/2}
end

case unprotect.(state.in_srtp, data) do
{:ok, payload} ->
# TODO: temporarily, everything goes to peer connection process
notify(state.owner, {:rtp_data, payload})
notify(state.owner, {type, payload})

{:error, reason} ->
Logger.warning("Failed to decrypt SRTP, reason: #{inspect(reason)}")
Logger.error("Failed to decrypt SRTP/SRTCP, reason: #{inspect(reason)}")
end

state
Expand Down Expand Up @@ -265,18 +284,28 @@ defmodule ExWebRTC.DTLSTransport do

defp handle_ice(_msg, state), do: state

defp setup_srtp(state, remote_keying_material, profile) do
defp setup_srtp(state, local_keying_material, remote_keying_material, profile) do
{:ok, crypto_profile} =
ExLibSRTP.Policy.crypto_profile_from_dtls_srtp_protection_profile(profile)

policy = %ExLibSRTP.Policy{
inbound_policy = %ExLibSRTP.Policy{
ssrc: :any_inbound,
key: remote_keying_material,
rtp: crypto_profile,
rtcp: crypto_profile
}

:ok = ExLibSRTP.add_stream(state.srtp, policy)
:ok = ExLibSRTP.add_stream(state.in_srtp, inbound_policy)

outbound_policy = %ExLibSRTP.Policy{
ssrc: :any_outbound,
key: local_keying_material,
rtp: crypto_profile,
rtcp: crypto_profile
}

:ok = ExLibSRTP.add_stream(state.out_srtp, outbound_policy)

state
end

Expand Down
9 changes: 4 additions & 5 deletions lib/ex_webrtc/media_stream_track.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ defmodule ExWebRTC.MediaStreamTrack do

@type t() :: %__MODULE__{
kind: :audio | :video,
id: integer(),
mid: String.t()
id: integer()
}

@enforce_keys [:id, :kind]
defstruct @enforce_keys ++ [:mid]
defstruct @enforce_keys

def from_transceiver(tr) do
%__MODULE__{kind: tr.kind, id: generate_id(), mid: tr.mid}
def new(kind) do
%__MODULE__{kind: kind, id: generate_id()}
end

defp generate_id() do
Expand Down
Loading
Loading