Skip to content

Commit

Permalink
Add hls playable field and notification (#58)
Browse files Browse the repository at this point in the history
* Add hls playable field and notification

* Code refactor

* Change requests

* Update protos
  • Loading branch information
Karolk99 authored Jul 24, 2023
1 parent f316fae commit ed2ea51
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 55 deletions.
15 changes: 12 additions & 3 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,32 @@ defmodule Jellyfish.Component do

alias Jellyfish.Component.{HLS, RTSP}

@callback metadata() :: map()

@enforce_keys [
:id,
:type,
:engine_endpoint
:engine_endpoint,
:metadata
]
defstruct @enforce_keys

@type id :: String.t()
@type component :: HLS | RTSP
@type metadata :: HLS.metadata() | RTSP.metadata()

@typedoc """
This module contains:
* `id` - component id
* `type` - type of this component
* `engine_endpoint` - engine endpoint for this component
* `metadata` - metadata of this component
"""
@type t :: %__MODULE__{
id: id(),
type: component(),
engine_endpoint: Membrane.ChildrenSpec.child_definition()
engine_endpoint: Membrane.ChildrenSpec.child_definition(),
metadata: metadata()
}

@spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type}
Expand All @@ -44,11 +50,14 @@ defmodule Jellyfish.Component do
@spec new(component(), map()) :: {:ok, t()} | {:error, term()}
def new(type, options) do
with {:ok, endpoint} <- type.config(options) do
metadata = type.metadata()

{:ok,
%__MODULE__{
id: UUID.uuid4(),
type: type,
engine_endpoint: endpoint
engine_endpoint: endpoint,
metadata: metadata
}}
else
{:error, _reason} = error -> error
Expand Down
6 changes: 6 additions & 0 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Jellyfish.Component.HLS do
"""

@behaviour Jellyfish.Endpoint.Config
@behaviour Jellyfish.Component

alias Membrane.RTC.Engine.Endpoint.HLS
alias Membrane.RTC.Engine.Endpoint.HLS.{CompositorConfig, HLSConfig, MixerConfig}
Expand All @@ -12,6 +13,8 @@ defmodule Jellyfish.Component.HLS do
@segment_duration Time.seconds(4)
@partial_segment_duration Time.milliseconds(400)

@type metadata :: %{playable: boolean()}

@impl true
def config(options) do
base_path = Application.fetch_env!(:jellyfish, :output_base_path)
Expand Down Expand Up @@ -42,4 +45,7 @@ defmodule Jellyfish.Component.HLS do
}
}}
end

@impl true
def metadata(), do: %{playable: false}
end
6 changes: 6 additions & 0 deletions lib/jellyfish/component/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ defmodule Jellyfish.Component.RTSP do
"""

@behaviour Jellyfish.Endpoint.Config
@behaviour Jellyfish.Component

alias Membrane.RTC.Engine.Endpoint.RTSP

alias JellyfishWeb.ApiSpec

@type metadata :: %{}

@impl true
def config(%{engine_pid: engine} = options) do
options = Map.drop(options, [:engine_pid, :room_id])
Expand All @@ -29,4 +32,7 @@ defmodule Jellyfish.Component.RTSP do
{:error, _reason} = error -> error
end
end

@impl true
def metadata(), do: %{}
end
21 changes: 21 additions & 0 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ defmodule Jellyfish.Room do
{:reply, {:ok, component}, state}
else
{:error, :incompatible_codec} ->
Logger.warn("Unable to add component: incompatible codec, HLS needs 'h264' video codec.")
{:reply, {:error, :incompatible_codec}, state}

{:error, reason} ->
Expand Down Expand Up @@ -321,6 +322,26 @@ defmodule Jellyfish.Room do
{:noreply, state}
end

@impl true
def handle_info({:playlist_playable, :audio, _playlist_idl}, state), do: {:noreply, state}

@impl true
def handle_info({:playlist_playable, :video, _playlist_idl}, state) do
endpoint_id =
Enum.find_value(state.components, fn {id, %{type: type}} ->
if type == Component.HLS, do: id
end)

Phoenix.PubSub.broadcast(
Jellyfish.PubSub,
"server_notification",
{:hls_playable, state.id, endpoint_id}
)

state = update_in(state, [:components, endpoint_id], &Map.put(&1, :playable, true))
{:noreply, state}
end

@impl true
def handle_info(info, state) do
Logger.warn("Received unexpected info: #{inspect(info)}")
Expand Down
20 changes: 18 additions & 2 deletions lib/jellyfish_web/api_spec/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,30 @@ defmodule JellyfishWeb.ApiSpec.Component do
})
end

defmodule Metadata do
@moduledoc false

require OpenApiSpex

OpenApiSpex.schema(%{
title: "ComponentMetadata",
description: "Component-specific metadata",
type: :object,
properties: %{
playable: %Schema{type: :boolean}
}
})
end

OpenApiSpex.schema(%{
title: "Component",
description: "Describes component",
type: :object,
properties: %{
id: %Schema{type: :string, description: "Assigned component id", example: "component-1"},
type: Type
type: Type,
metadata: Metadata
},
required: [:id, :type]
required: [:id, :type, :metadata]
})
end
3 changes: 2 additions & 1 deletion lib/jellyfish_web/controllers/component_json.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule JellyfishWeb.ComponentJSON do

%{
id: component.id,
type: type
type: type,
metadata: component.metadata
}
end
end
67 changes: 36 additions & 31 deletions lib/jellyfish_web/server_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule JellyfishWeb.ServerSocket do
Authenticated,
AuthRequest,
ComponentCrashed,
HlsPlayable,
MetricsReport,
PeerConnected,
PeerCrashed,
Expand Down Expand Up @@ -160,35 +161,8 @@ defmodule JellyfishWeb.ServerSocket do

@impl true
def handle_info(msg, state) do
content =
case msg do
{:room_created, room_id} ->
{:room_created, %RoomCreated{room_id: room_id}}

{:room_deleted, room_id} ->
{:room_deleted, %RoomDeleted{room_id: room_id}}

{:room_crashed, room_id} ->
{:room_crashed, %RoomCrashed{room_id: room_id}}

{:peer_connected, room_id, peer_id} ->
{:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}}

{:peer_disconnected, room_id, peer_id} ->
{:peer_disconnected, %PeerDisconnected{room_id: room_id, peer_id: peer_id}}

{:peer_crashed, room_id, peer_id} ->
{:peer_crashed, %PeerCrashed{room_id: room_id, peer_id: peer_id}}

{:component_crashed, room_id, component_id} ->
{:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}}

{:metrics, report} ->
{:metrics_report, %MetricsReport{metrics: report}}
end

content = to_proto_notification(msg)
encoded_msg = %ServerMessage{content: content} |> ServerMessage.encode()

{:push, {:binary, encoded_msg}, state}
end

Expand Down Expand Up @@ -224,7 +198,7 @@ defmodule JellyfishWeb.ServerSocket do
|> Enum.map(
&%RoomState.Component{
id: &1.id,
type: to_proto_type(&1.type)
component: to_proto_component(&1)
}
)

Expand All @@ -247,8 +221,33 @@ defmodule JellyfishWeb.ServerSocket do
%RoomState{id: room.id, config: config, peers: peers, components: components}
end

defp to_proto_type(Jellyfish.Component.HLS), do: :TYPE_HLS
defp to_proto_type(Jellyfish.Component.RTSP), do: :TYPE_RTSP
defp to_proto_notification({:room_created, room_id}),
do: {:room_created, %RoomCreated{room_id: room_id}}

defp to_proto_notification({:room_deleted, room_id}),
do: {:room_deleted, %RoomDeleted{room_id: room_id}}

defp to_proto_notification({:room_crashed, room_id}),
do: {:room_crashed, %RoomCrashed{room_id: room_id}}

defp to_proto_notification({:peer_connected, room_id, peer_id}),
do: {:peer_connected, %PeerConnected{room_id: room_id, peer_id: peer_id}}

defp to_proto_notification({:peer_disconnected, room_id, peer_id}),
do: {:peer_disconnected, %PeerDisconnected{room_id: room_id, peer_id: peer_id}}

defp to_proto_notification({:peer_crashed, room_id, peer_id}),
do: {:peer_crashed, %PeerCrashed{room_id: room_id, peer_id: peer_id}}

defp to_proto_notification({:component_crashed, room_id, component_id}),
do: {:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}}

defp to_proto_notification({:metrics, report}),
do: {:metrics_report, %MetricsReport{metrics: report}}

defp to_proto_notification({:hls_playable, room_id, component_id}),
do: {:hls_playable, %HlsPlayable{room_id: room_id, component_id: component_id}}

defp to_proto_type(Jellyfish.Peer.WebRTC), do: :TYPE_WEBRTC

defp to_proto_codec(:h264), do: :CODEC_H264
Expand All @@ -257,4 +256,10 @@ defmodule JellyfishWeb.ServerSocket do

defp to_proto_status(:disconnected), do: :STATUS_DISCONNECTED
defp to_proto_status(:connected), do: :STATUS_CONNECTED

defp to_proto_component(%{type: Jellyfish.Component.HLS, metadata: %{playable: playable}}),
do: {:hls, %RoomState.Component.Hls{playable: playable}}

defp to_proto_component(%{type: Jellyfish.Component.RTSP}),
do: {:rtsp, %RoomState.Component.Rtsp{}}
end
47 changes: 34 additions & 13 deletions lib/protos/jellyfish/server_notifications.pb.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Status do
field :STATUS_DISCONNECTED, 2
end

defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type do
@moduledoc false

use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :TYPE_UNSPECIFIED, 0
field :TYPE_HLS, 1
field :TYPE_RTSP, 2
end

defmodule Jellyfish.ServerMessage.RoomCrashed do
@moduledoc false

Expand Down Expand Up @@ -168,16 +158,33 @@ defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer do
enum: true
end

defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Hls do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :playable, 1, type: :bool
end

defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Rtsp do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3
end

defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

oneof :component, 0

field :id, 1, type: :string
field :hls, 2, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Hls, oneof: 0

field :type, 2,
type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type,
enum: true
field :rtsp, 3,
type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Rtsp,
oneof: 0
end

defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState do
Expand Down Expand Up @@ -259,6 +266,15 @@ defmodule Jellyfish.ServerMessage.MetricsReport do
field :metrics, 1, type: :string
end

defmodule Jellyfish.ServerMessage.HlsPlayable do
@moduledoc false

use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3

field :room_id, 1, type: :string, json_name: "roomId"
field :component_id, 2, type: :string, json_name: "componentId"
end

defmodule Jellyfish.ServerMessage do
@moduledoc false

Expand Down Expand Up @@ -322,4 +338,9 @@ defmodule Jellyfish.ServerMessage do
type: Jellyfish.ServerMessage.MetricsReport,
json_name: "metricsReport",
oneof: 0

field :hls_playable, 13,
type: Jellyfish.ServerMessage.HlsPlayable,
json_name: "hlsPlayable",
oneof: 0
end
11 changes: 11 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ components:
description: Assigned component id
example: component-1
type: string
metadata:
$ref: '#/components/schemas/ComponentMetadata'
type:
$ref: '#/components/schemas/ComponentType'
required:
- id
- type
- metadata
title: Component
type: object
x-struct: Elixir.JellyfishWeb.ApiSpec.Component
Expand All @@ -33,6 +36,14 @@ components:
title: ComponentDetailsResponse
type: object
x-struct: Elixir.JellyfishWeb.ApiSpec.ComponentDetailsResponse
ComponentMetadata:
description: Component-specific metadata
properties:
playable:
type: boolean
title: ComponentMetadata
type: object
x-struct: Elixir.JellyfishWeb.ApiSpec.Component.Metadata
ComponentOptions:
description: Component-specific options
oneOf:
Expand Down
2 changes: 1 addition & 1 deletion protos
Loading

0 comments on commit ed2ea51

Please sign in to comment.