Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add :force_transcoding option #65

Merged
merged 18 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 101 additions & 25 deletions lib/boombox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ defmodule Boombox do

See `run/1` for details and [examples.livemd](examples.livemd) for examples.
"""
require Logger
require Membrane.Time

alias Membrane.RTP

@type force_transcoding() :: {:force_transcoding, boolean() | :audio | :video}

@type webrtc_signaling :: Membrane.WebRTC.Signaling.t() | String.t()
@type in_stream_opts :: [
{:audio, :binary | boolean()}
Expand Down Expand Up @@ -65,6 +68,7 @@ defmodule Boombox do
| {:address, :inet.ip_address() | String.t()}
| {:port, :inet.port_number()}
| {:target, String.t()}
| force_transcoding()
]

@type input ::
Expand All @@ -79,15 +83,22 @@ defmodule Boombox do

@type output ::
(path_or_uri :: String.t())
| {path_or_uri :: String.t(), [force_transcoding()]}
| {:mp4, location :: String.t()}
| {:mp4, location :: String.t(), [force_transcoding()]}
| {:webrtc, webrtc_signaling()}
| {:webrtc, webrtc_signaling(), [force_transcoding()]}
| {:whip, uri :: String.t(), [{:token, String.t()} | {bandit_option :: atom(), term()}]}
| {:hls, location :: String.t()}
| {:hls, location :: String.t(), [force_transcoding()]}
| {:rtp, out_rtp_opts()}
| {:stream, out_stream_opts()}

@typep procs :: %{pipeline: pid(), supervisor: pid()}
@typep opts_map :: %{input: input(), output: output()}
@typep opts_map :: %{
input: input(),
output: output()
}

@doc """
Runs boombox with given input and output.
Expand All @@ -103,13 +114,27 @@ defmodule Boombox do

If the input is `{:stream, opts}`, a `Stream` or other `Enumerable` is expected
as the first argument.
```
Boombox.run(
input: "path/to/file.mp4",
output: {:webrtc, "ws://0.0.0.0:1234"}
)
```
"""
@spec run(Enumerable.t() | nil, input: input(), output: output()) :: :ok | Enumerable.t()
@spec run(Enumerable.t() | nil,
input: input(),
output: output()
) :: :ok | Enumerable.t()
@endpoint_opts [:input, :output]
def run(stream \\ nil, opts) do
opts_keys = [:input, :output]
opts = Keyword.validate!(opts, opts_keys) |> Map.new(fn {k, v} -> {k, parse_opt!(k, v)} end)
opts =
opts
|> Keyword.validate!(@endpoint_opts)
|> Map.new(fn {key, value} -> {key, parse_endpoint_opt!(key, value)} end)

:ok = maybe_log_transcoding_related_warning(opts)

if key = Enum.find(opts_keys, fn k -> not is_map_key(opts, k) end) do
if key = Enum.find(@endpoint_opts, fn k -> not is_map_key(opts, k) end) do
raise "#{key} is not provided"
end

Expand Down Expand Up @@ -157,55 +182,80 @@ defmodule Boombox do
end
end

@spec parse_opt!(:input, input()) :: input()
@spec parse_opt!(:output, output()) :: output()
defp parse_opt!(direction, value) when is_binary(value) do
@spec parse_endpoint_opt!(:input, input()) :: input()
@spec parse_endpoint_opt!(:output, output()) :: output()
defp parse_endpoint_opt!(direction, value) when is_binary(value) do
parse_endpoint_opt!(direction, {value, []})
end

defp parse_endpoint_opt!(direction, {value, opts}) when is_binary(value) do
uri = URI.parse(value)
scheme = uri.scheme
extension = if uri.path, do: Path.extname(uri.path)

case {scheme, extension, direction} do
{scheme, ".mp4", :input} when scheme in [nil, "http", "https"] -> {:mp4, value}
{nil, ".mp4", :output} -> {:mp4, value}
{scheme, ".mp4", :input} when scheme in [nil, "http", "https"] -> {:mp4, value, opts}
{nil, ".mp4", :output} -> {:mp4, value, opts}
{scheme, _ext, :input} when scheme in ["rtmp", "rtmps"] -> {:rtmp, value}
{"rtsp", _ext, :input} -> {:rtsp, value}
{nil, ".m3u8", :output} -> {:hls, value}
{nil, ".m3u8", :output} -> {:hls, value, opts}
_other -> raise ArgumentError, "Unsupported URI: #{value} for direction: #{direction}"
end
|> then(&parse_opt!(direction, &1))
|> then(&parse_endpoint_opt!(direction, &1))
end

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defp parse_opt!(direction, value) when is_tuple(value) do
defp parse_endpoint_opt!(direction, value) when is_tuple(value) do
case value do
{:mp4, location} when is_binary(location) and direction == :input ->
parse_opt!(:input, {:mp4, location, []})
parse_endpoint_opt!(:input, {:mp4, location, []})

{:mp4, location, opts} when is_binary(location) and direction == :input ->
if Keyword.keyword?(opts),
do: {:mp4, location, transport: resolve_transport(location, opts)}
opts = opts |> Keyword.put(:transport, resolve_transport(location, opts))
{:mp4, location, opts}

{:mp4, location} when is_binary(location) and direction == :output ->
{:mp4, location}
{:mp4, location, []}

{:webrtc, %Membrane.WebRTC.Signaling{}} ->
{:mp4, location, _opts} when is_binary(location) and direction == :output ->
value

{:webrtc, uri} when is_binary(uri) ->
{:webrtc, %Membrane.WebRTC.Signaling{}} when direction == :input ->
value

{:webrtc, %Membrane.WebRTC.Signaling{} = signaling} ->
{:webrtc, signaling, []}

{:webrtc, %Membrane.WebRTC.Signaling{}, _opts} when direction == :output ->
value

{:webrtc, uri} when is_binary(uri) and direction == :input ->
value

{:webrtc, uri} when is_binary(uri) and direction == :output ->
{:webrtc, uri, []}

{:webrtc, uri, _opts} when is_binary(uri) and direction == :output ->
value

{:whip, uri} when is_binary(uri) ->
parse_opt!(direction, {:whip, uri, []})
parse_endpoint_opt!(direction, {:whip, uri, []})

{:whip, uri, opts} when is_binary(uri) and is_list(opts) ->
if Keyword.keyword?(opts) do
{:webrtc, {:whip, uri, opts}}
end
{:whip, uri, opts} when is_binary(uri) and is_list(opts) and direction == :input ->
if Keyword.keyword?(opts), do: {:webrtc, value}

{:whip, uri, opts} when is_binary(uri) and is_list(opts) and direction == :output ->
{webrtc_opts, whip_opts} = split_webrtc_and_whip_opts(opts)
if Keyword.keyword?(opts), do: {:webrtc, {:whip, uri, whip_opts}, webrtc_opts}

{:rtmp, arg} when direction == :input and (is_binary(arg) or is_pid(arg)) ->
value

{:hls, location} when direction == :output and is_binary(location) ->
{:hls, location, []}

{:hls, location, opts}
when direction == :output and is_binary(location) and is_list(opts) ->
value

{:rtsp, location} when direction == :input and is_binary(location) ->
Expand All @@ -226,6 +276,27 @@ defmodule Boombox do
end
end

defguardp is_webrtc_endpoint(endpoint)
when is_tuple(endpoint) and elem(endpoint, 0) in [:webrtc, :whip]

@spec maybe_log_transcoding_related_warning(opts_map()) :: :ok
def maybe_log_transcoding_related_warning(opts) do
if is_webrtc_endpoint(opts.output) and not is_webrtc_endpoint(opts.input) and
webrtc_output_force_transcoding(opts) not in [true, :video] do
Logger.warning("""
Boombox output protocol is WebRTC, while Boombox input doesn't support keyframe requests. This \
might lead to issues with the output video if the output stream isn't sent only by localhost. You \
can solve this by setting `:force_transcoding` output option to `true` or `:video`, but be aware \
that it will increase Boombox CPU usage.
""")
end

:ok
end

defp webrtc_output_force_transcoding(%{output: {:webrtc, _singaling, opts}}),
do: Keyword.get(opts, :force_transcoding)

@spec consume_stream(Enumerable.t(), opts_map()) :: term()
defp consume_stream(stream, opts) do
procs = start_pipeline(opts)
Expand Down Expand Up @@ -341,7 +412,7 @@ defmodule Boombox do

@spec resolve_transport(String.t(), [{:transport, :file | :http}]) :: :file | :http
defp resolve_transport(location, opts) do
case Keyword.validate!(opts, transport: nil)[:transport] do
case Keyword.validate!(opts, transport: nil, force_transcoding: false)[:transport] do
nil ->
uri = URI.parse(location)

Expand All @@ -359,4 +430,9 @@ defmodule Boombox do
raise ArgumentError, "Invalid transport: #{inspect(transport)}"
end
end

defp split_webrtc_and_whip_opts(opts) do
opts
|> Enum.split_with(fn {key, _value} -> key == :force_transcoding end)
end
end
11 changes: 8 additions & 3 deletions lib/boombox/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ defmodule Boombox.HLS do

@spec link_output(
Path.t(),
[Boombox.force_transcoding()],
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, opts, track_builders, spec_builder) do
force_transcoding = opts |> Keyword.get(:force_transcoding, false)

{directory, manifest_name} =
if Path.extname(location) == ".m3u8" do
{Path.dirname(location), Path.basename(location, ".m3u8")}
Expand Down Expand Up @@ -44,7 +47,8 @@ defmodule Boombox.HLS do
{:audio, builder} ->
builder
|> child(:hls_audio_transcoder, %Membrane.Transcoder{
output_stream_format: Membrane.AAC
output_stream_format: Membrane.AAC,
force_transcoding?: force_transcoding in [true, :audio]
})
|> via_in(Pad.ref(:input, :audio),
options: [encoding: :AAC, segment_duration: Time.milliseconds(2000)]
Expand All @@ -54,7 +58,8 @@ defmodule Boombox.HLS do
{:video, builder} ->
builder
|> child(:hls_video_transcoder, %Membrane.Transcoder{
output_stream_format: %H264{alignment: :au, stream_structure: :avc3}
output_stream_format: %H264{alignment: :au, stream_structure: :avc3},
force_transcoding?: force_transcoding in [true, :video]
})
|> via_in(Pad.ref(:input, :video),
options: [encoding: :H264, segment_duration: Time.milliseconds(2000)]
Expand Down
14 changes: 10 additions & 4 deletions lib/boombox/mp4.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ defmodule Boombox.MP4 do

defguardp is_h26x(format) when is_struct(format) and format.__struct__ in [H264, H265]

@spec create_input(String.t(), transport: :file | :http) :: Wait.t()
@spec create_input(String.t(), [Boombox.force_transcoding() | {:transport, :file | :http}]) ::
Wait.t()
def create_input(location, opts) do
spec =
case opts[:transport] do
Expand Down Expand Up @@ -53,10 +54,13 @@ defmodule Boombox.MP4 do

@spec link_output(
String.t(),
[Boombox.force_transcoding()],
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, opts, track_builders, spec_builder) do
force_transcoding = opts |> Keyword.get(:force_transcoding, false)

spec =
[
spec_builder,
Expand All @@ -66,7 +70,8 @@ defmodule Boombox.MP4 do
{:audio, builder} ->
builder
|> child(:mp4_audio_transcoder, %Membrane.Transcoder{
output_stream_format: Membrane.AAC
output_stream_format: Membrane.AAC,
force_transcoding?: force_transcoding in [true, :audio]
})
|> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
Expand All @@ -90,7 +95,8 @@ defmodule Boombox.MP4 do

_not_h26x ->
%H264{stream_structure: :avc3, alignment: :au}
end
end,
force_transcoding?: force_transcoding in [true, :video]
})
|> via_in(Pad.ref(:input, :video))
|> get_child(:mp4_muxer)
Expand Down
25 changes: 17 additions & 8 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ defmodule Boombox.Pipeline do
defmodule State do
@moduledoc false

@enforce_keys [:status, :input, :output, :parent]
@enforce_keys [
:status,
:input,
:output,
:parent
# :force_transcoding
]

defstruct @enforce_keys ++
[
Expand Down Expand Up @@ -150,7 +156,10 @@ defmodule Boombox.Pipeline do
"""
end

{:webrtc, _signaling, webrtc_opts} = state.output

Boombox.WebRTC.handle_output_tracks_negotiated(
webrtc_opts,
state.track_builders,
state.spec_builder,
tracks,
Expand Down Expand Up @@ -331,7 +340,7 @@ defmodule Boombox.Pipeline do

@spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t(), State.t()) ::
{Ready.t() | Wait.t(), State.t()}
defp create_output({:webrtc, signaling}, ctx, state) do
defp create_output({:webrtc, signaling, _opts}, ctx, state) do
Boombox.WebRTC.create_output(signaling, ctx, state)
end

Expand All @@ -347,21 +356,21 @@ defmodule Boombox.Pipeline do
State.t()
) ::
Ready.t() | Wait.t()
defp link_output({:webrtc, _signaling}, track_builders, spec_builder, _ctx, state) do
defp link_output({:webrtc, _signaling, opts}, track_builders, spec_builder, _ctx, state) do
tracks = [
%{kind: :audio, id: :audio_track},
%{kind: :video, id: :video_tracks}
]

Boombox.WebRTC.link_output(track_builders, spec_builder, tracks, state)
Boombox.WebRTC.link_output(opts, track_builders, spec_builder, tracks, state)
end

defp link_output({:mp4, location}, track_builders, spec_builder, _ctx, _state) do
Boombox.MP4.link_output(location, track_builders, spec_builder)
defp link_output({:mp4, location, opts}, track_builders, spec_builder, _ctx, _state) do
Boombox.MP4.link_output(location, opts, track_builders, spec_builder)
end

defp link_output({:hls, location}, track_builders, spec_builder, _ctx, _state) do
Boombox.HLS.link_output(location, track_builders, spec_builder)
defp link_output({:hls, location, opts}, track_builders, spec_builder, _ctx, _state) do
Boombox.HLS.link_output(location, opts, track_builders, spec_builder)
end

defp link_output({:rtp, opts}, track_builders, spec_builder, _ctx, _state) do
Expand Down
Loading