Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WHIP support #5

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added examples/whip_recording.mkv
Empty file.
60 changes: 60 additions & 0 deletions examples/whip_to_file.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# This example receives audio and video from obs via WHIP
# and saves it to a `recording.mkv` file.

require Logger
Logger.configure(level: :info)

Mix.install([
{:membrane_webrtc_plugin, path: "#{__DIR__}/.."},
:membrane_file_plugin,
:membrane_realtimer_plugin,
:membrane_matroska_plugin,
:membrane_opus_plugin,
:membrane_h264_plugin,
:corsica
])

defmodule Example.Pipeline do
use Membrane.Pipeline

alias Membrane.WebRTC

@impl true
def handle_init(_ctx, opts) do
spec =
[
child(:webrtc, %WebRTC.Source{
whip: "http://127.0.0.1:8829/whip"
}),
child(:matroska, Membrane.Matroska.Muxer),
get_child(:webrtc)
|> via_out(:output, options: [kind: :audio])
|> child(Membrane.Opus.Parser)
|> get_child(:matroska),
get_child(:webrtc)
|> via_out(:output, options: [kind: :video])
|> get_child(:matroska),
get_child(:matroska)
|> child(:sink, %Membrane.File.Sink{location: "whip_recording.mkv"})
]

{[spec: spec], %{}}
end

@impl true
def handle_element_end_of_stream(:sink, :input, _ctx, state) do
{[terminate: :normal], state}
end

@impl true
def handle_element_end_of_stream(_element, _pad, _ctx, state) do
{[], state}
end
end

{:ok, supervisor, _pipeline} = Membrane.Pipeline.start_link(Example.Pipeline, port: 8829)
Process.monitor(supervisor)

receive do
{:DOWN, _ref, :process, ^supervisor, _reason} -> :ok
end
90 changes: 90 additions & 0 deletions lib/membrane_webrtc/ex_webrtc/source_whip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
defmodule Membrane.WebRTC.ExWebRTCSourceWHIP do
@moduledoc false

use Membrane.Source

require Membrane.Logger

alias Membrane.WebRTC.WhipWhep.{PeerSupervisor, Forwarder, Router}
alias Membrane.WebRTC.ExWebRTCUtils
def_options whip: [], video_codec: [], port: [], ip: [], parent: []

def_output_pad :output,
accepted_format: Membrane.RTP,
availability: :on_request,
flow_control: :push,
options: [kind: [default: nil]]

@impl true
def handle_init(_ctx, opts) do
self = self()

children = [
{Bandit,
plug: Router, scheme: :http, ip: ExWebRTCUtils.parse_ip_to_tuple(opts.ip), port: opts.port},
PeerSupervisor,
{Forwarder, source_pid: self},
{Registry, name: __MODULE__.PeerRegistry, keys: :unique}
]

Supervisor.start_link(children, strategy: :one_for_all, name: __MODULE__.Supervisor)

{[],
%{
output_tracks: %{},
parent: opts.parent
}}
end

@impl true
def handle_playing(_ctx, state) do
IO.inspect("source whip PLAYING")
{[], state}
end

@impl true
def handle_setup(_ctx, state) do
{[setup: :incomplete], state}
end

@impl true
def handle_info(:peer_connected, _ctx, state) do
IO.inspect("source whip handle_info peer_connected")
{[setup: :complete, notify_parent: :setup_complete], state}
end

@impl true
def handle_info({_x, _y, _z}, %{playback: :stopped} = _ctx, state) do
# IO.inspect("source whip handle_info playback stopped")
{[], state}
end

@impl true
def handle_info({:video_packet, id, packet}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: packet.payload,
metadata: %{rtp: packet |> Map.from_struct() |> Map.delete(:payload)}
}

{[buffer: {state.output_tracks[id], buffer}], state}
end

@impl true
def handle_info({:audio_packet, id, packet}, _ctx, state) do
buffer = %Membrane.Buffer{
payload: packet.payload,
metadata: %{rtp: packet |> Map.from_struct() |> Map.delete(:payload)}
}

{[buffer: {state.output_tracks[id], buffer}], state}
end

@impl true
def handle_pad_added(Pad.ref(:output, pad_id) = pad, _ctx, state) do
%{output_tracks: output_tracks} = state
output_tracks = Map.put(output_tracks, pad_id, pad)
state = %{state | output_tracks: output_tracks}
IO.inspect(state, label: "source whip pad added")
{[stream_format: {pad, %Membrane.RTP{}}], state}
end
end
9 changes: 9 additions & 0 deletions lib/membrane_webrtc/ex_webrtc/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ defmodule Membrane.WebRTC.ExWebRTCUtils do
]
end

def parse_ip_to_tuple(ip) do
String.split(ip, ".")
|> Enum.map(fn x ->
{number, _rest} = Integer.parse(x)
number
end)
|> List.to_tuple()
end

@spec codec_clock_rate(:opus | :h264 | :vp8) :: pos_integer()
def codec_clock_rate(:opus), do: 48_000
def codec_clock_rate(:vp8), do: 90_000
Expand Down
107 changes: 107 additions & 0 deletions lib/membrane_webrtc/ex_webrtc/whip_whep/forwarder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# this module forwards rtp packets to source_whip

defmodule Membrane.WebRTC.WhipWhep.Forwarder do
use GenServer

require Logger

alias Membrane.WebRTC.WhipWhep.PeerSupervisor
alias ExWebRTC.PeerConnection

@spec start_link(any()) :: GenServer.on_start()
def start_link(source_pid: source_pid) do
GenServer.start_link(__MODULE__, %{source_pid: source_pid}, name: __MODULE__)
end

@spec connect_input(pid()) :: :ok
def connect_input(pc) do
GenServer.call(__MODULE__, {:connect_input, pc})
end

@impl true
def init(opts) do
state = %{
input_pc: nil,
audio_input: nil,
video_input: nil,
source_pid: opts.source_pid
}

IO.inspect(state)
{:ok, state}
end

@impl true
def handle_call({:connect_input, pc}, _from, state) do
if state.input_pc != nil do
PeerSupervisor.terminate_pc(state.input_pc)
end

PeerConnection.controlling_process(pc, self())
{audio_track_id, video_track_id} = get_tracks(pc, :receiver)

Logger.info("Successfully added input #{inspect(pc)}")

state = %{state | input_pc: pc, audio_input: audio_track_id, video_input: video_track_id}
{:reply, :ok, state}
end

@impl true
def handle_info(
{:ex_webrtc, pc, {:connection_state_change, :connected}},
%{input_pc: pc} = state
) do
Logger.info("exWebRTC Input #{inspect(pc)} has successfully connected")
send(state.source_pid, :peer_connected)
{:noreply, state}
end

@impl true
def handle_info(
{:ex_webrtc, input_pc, {:rtp, id, nil, packet}},
%{input_pc: input_pc, audio_input: id} = state
) do
send(state.source_pid, {:audio_packet, id, packet})
{:noreply, state}
end

@impl true
def handle_info(
{:ex_webrtc, input_pc, {:rtp, id, nil, packet}},
%{input_pc: input_pc, video_input: id} = state
) do
send(state.source_pid, {:video_packet, id, packet})
{:noreply, state}
end

@impl true
def handle_info({:ex_webrtc, _pc, {:rtcp, packets}}, state) do
for packet <- packets do
case packet do
{_track_id, %ExRTCP.Packet.PayloadFeedback.PLI{}} when state.input_pc != nil ->
:ok = PeerConnection.send_pli(state.input_pc, state.video_input)

_other ->
:ok
end
end

{:noreply, state}
end

@impl true
def handle_info(_msg, state) do
{:noreply, state}
end

defp get_tracks(pc, type) do
transceivers = PeerConnection.get_transceivers(pc)
audio_transceiver = Enum.find(transceivers, fn tr -> tr.kind == :audio end)
video_transceiver = Enum.find(transceivers, fn tr -> tr.kind == :video end)

audio_track_id = Map.fetch!(audio_transceiver, type).track.id
video_track_id = Map.fetch!(video_transceiver, type).track.id

{audio_track_id, video_track_id}
end
end
110 changes: 110 additions & 0 deletions lib/membrane_webrtc/ex_webrtc/whip_whep/peer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# this module does all the negotiation with peers
defmodule Membrane.WebRTC.WhipWhep.PeerSupervisor do
use DynamicSupervisor

require Logger

alias ExWebRTC.{PeerConnection, SessionDescription, RTPCodecParameters}

@audio_codecs [
%RTPCodecParameters{
payload_type: 111,
mime_type: "audio/opus",
clock_rate: 48_000,
channels: 2
}
]

@video_codecs [
%RTPCodecParameters{
payload_type: 96,
mime_type: "video/H264",
clock_rate: 90_000
}
]

@opts [
ice_servers: [%{urls: "stun:stun.l.google.com:19302"}],
audio_codecs: @audio_codecs,
video_codecs: @video_codecs
]

@spec start_link(any()) :: Supervisor.on_start()
def start_link(arg) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

@spec start_whip(String.t()) :: {:ok, pid(), String.t()} | {:error, term()}
def start_whip(offer_sdp), do: start_pc(offer_sdp, :recvonly)

@spec pc_name(String.t()) :: term()
def pc_name(id), do: {:via, Registry, {Membrane.WebRTC.ExWebRTCSourceWHIP.PeerRegistry, id}}

@spec terminate_pc(pid()) :: :ok | {:error, :not_found}
def terminate_pc(pc) do
DynamicSupervisor.terminate_child(__MODULE__, pc)
end

@impl true
def init(_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end

defp start_pc(offer_sdp, direction) do
offer = %SessionDescription{type: :offer, sdp: offer_sdp}
pc_id = generate_pc_id()
{:ok, pc} = spawn_peer_connection(pc_id)

Logger.info("Received offer for #{inspect(pc)}, SDP:\n#{offer.sdp}")

with :ok <- PeerConnection.set_remote_description(pc, offer),
:ok <- setup_transceivers(pc, direction),
{:ok, answer} <- PeerConnection.create_answer(pc),
:ok <- PeerConnection.set_local_description(pc, answer),
:ok <- gather_candidates(pc),
answer <- PeerConnection.get_local_description(pc) do
Logger.info("Sent answer for #{inspect(pc)}, SDP:\n#{answer.sdp}")

{:ok, pc, pc_id, answer.sdp}
else
{:error, _res} = err ->
Logger.info("Failed to complete negotiation for #{inspect(pc)}")
terminate_pc(pc)
err
end
end

defp setup_transceivers(pc, direction) do
transceivers = PeerConnection.get_transceivers(pc)

for %{id: id} <- transceivers do
PeerConnection.set_transceiver_direction(pc, id, direction)
end

:ok
end

defp spawn_peer_connection(id) do
name = pc_name(id)
gen_server_opts = [name: name]
pc_opts = Keyword.put(@opts, :controlling_process, self())

child_spec = %{
id: PeerConnection,
start: {PeerConnection, :start_link, [pc_opts, gen_server_opts]},
restart: :temporary
}

DynamicSupervisor.start_child(__MODULE__, child_spec)
end

defp gather_candidates(pc) do
receive do
{:ex_webrtc, ^pc, {:ice_gathering_state_change, :complete}} -> :ok
after
1000 -> {:error, :timeout}
end
end

defp generate_pc_id(), do: for(_ <- 1..10, into: "", do: <<Enum.random(~c"0123456789abcdef")>>)
end
Loading