Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add :force_transcoding option #65

Merged
merged 18 commits into from
Mar 19, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 102 additions & 15 deletions lib/boombox.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ defmodule Boombox do

See `run/1` for details and [examples.livemd](examples.livemd) for examples.
"""
require Logger
require Membrane.Time

alias Membrane.RTP

@type force_transcoding_value() :: boolean() | :audio | :video
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make it full tuple

Suggested change
@type force_transcoding_value() :: boolean() | :audio | :video
@type force_transcoding() :: {:force_transcoding, boolean() | :audio | :video}


@type webrtc_signaling :: Membrane.WebRTC.Signaling.t() | String.t()
@type in_stream_opts :: [
{:audio, :binary | boolean()}
Expand Down Expand Up @@ -65,6 +68,7 @@ defmodule Boombox do
| {:address, :inet.ip_address() | String.t()}
| {:port, :inet.port_number()}
| {:target, String.t()}
| {:force_transcoding, force_transcoding_value()}
]

@type input ::
Expand All @@ -79,15 +83,22 @@ defmodule Boombox do

@type output ::
(path_or_uri :: String.t())
| {path_or_uri :: String.t(), [{:force_transcoding, force_transcoding_value()}]}
| {:mp4, location :: String.t()}
| {:mp4, location :: String.t(), [{:force_transcoding, force_transcoding_value()}]}
| {:webrtc, webrtc_signaling()}
| {:webrtc, webrtc_signaling(), [{:force_transcoding, force_transcoding_value()}]}
| {:whip, uri :: String.t(), [{:token, String.t()} | {bandit_option :: atom(), term()}]}
| {:hls, location :: String.t()}
| {:hls, location :: String.t(), [{:force_transcoding, force_transcoding_value()}]}
| {:rtp, out_rtp_opts()}
| {:stream, out_stream_opts()}

@typep procs :: %{pipeline: pid(), supervisor: pid()}
@typep opts_map :: %{input: input(), output: output()}
@typep opts_map :: %{
input: input(),
output: output()
}

@doc """
Runs boombox with given input and output.
Expand All @@ -103,13 +114,35 @@ defmodule Boombox do

If the input is `{:stream, opts}`, a `Stream` or other `Enumerable` is expected
as the first argument.

If `:enforce_audio_transcoding?` or `:enforce_video_transcoding?` option is set to `true`,
boombox will perform audio and/or video transcoding, even if it is not necessary. By default
both options are set to `false`.

```
Boombox.run(
input: "path/to/file.mp4",
output: {:webrtc, "ws://0.0.0.0:1234"},
enforce_video_transcoding?: true,
enforce_audio_transcoding?: true
)
```
"""
@spec run(Enumerable.t() | nil, input: input(), output: output()) :: :ok | Enumerable.t()
@spec run(Enumerable.t() | nil,
input: input(),
output: output()
) :: :ok | Enumerable.t()
@endpoint_opts [:input, :output]
def run(stream \\ nil, opts) do
opts_keys = [:input, :output]
opts = Keyword.validate!(opts, opts_keys) |> Map.new(fn {k, v} -> {k, parse_opt!(k, v)} end)
opts =
opts
|> Keyword.validate!(@endpoint_opts)
|> Map.new(fn {key, value} -> {key, parse_endpoint_opt!(key, value)} end)
|> resolve_force_transcoding()

:ok = maybe_log_transcoding_related_warning(opts)

if key = Enum.find(opts_keys, fn k -> not is_map_key(opts, k) end) do
if key = Enum.find(@endpoint_opts, fn k -> not is_map_key(opts, k) end) do
raise "#{key} is not provided"
end

Expand Down Expand Up @@ -157,29 +190,33 @@ defmodule Boombox do
end
end

@spec parse_opt!(:input, input()) :: input()
@spec parse_opt!(:output, output()) :: output()
defp parse_opt!(direction, value) when is_binary(value) do
@spec parse_endpoint_opt!(:input, input()) :: input()
@spec parse_endpoint_opt!(:output, output()) :: output()
defp parse_endpoint_opt!(direction, value) when is_binary(value) do
parse_endpoint_opt!(direction, {value, []})
end

defp parse_endpoint_opt!(direction, {value, opts}) when is_binary(value) do
uri = URI.parse(value)
scheme = uri.scheme
extension = if uri.path, do: Path.extname(uri.path)

case {scheme, extension, direction} do
{scheme, ".mp4", :input} when scheme in [nil, "http", "https"] -> {:mp4, value}
{nil, ".mp4", :output} -> {:mp4, value}
{scheme, ".mp4", :input} when scheme in [nil, "http", "https"] -> {:mp4, value, opts}
{nil, ".mp4", :output} -> {:mp4, value, opts}
{scheme, _ext, :input} when scheme in ["rtmp", "rtmps"] -> {:rtmp, value}
{"rtsp", _ext, :input} -> {:rtsp, value}
{nil, ".m3u8", :output} -> {:hls, value}
{nil, ".m3u8", :output} -> {:hls, value, opts}
_other -> raise ArgumentError, "Unsupported URI: #{value} for direction: #{direction}"
end
|> then(&parse_opt!(direction, &1))
|> then(&parse_endpoint_opt!(direction, &1))
end

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defp parse_opt!(direction, value) when is_tuple(value) do
defp parse_endpoint_opt!(direction, value) when is_tuple(value) do
case value do
{:mp4, location} when is_binary(location) and direction == :input ->
parse_opt!(:input, {:mp4, location, []})
parse_endpoint_opt!(:input, {:mp4, location, []})

{:mp4, location, opts} when is_binary(location) and direction == :input ->
if Keyword.keyword?(opts),
Expand All @@ -188,14 +225,23 @@ defmodule Boombox do
{:mp4, location} when is_binary(location) and direction == :output ->
{:mp4, location}

{:mp4, location, _opts} when is_binary(location) and direction == :output ->
value

{:webrtc, %Membrane.WebRTC.Signaling{}} ->
value

{:webrtc, %Membrane.WebRTC.Signaling{}, _opts} when direction == :output ->
value

{:webrtc, uri} when is_binary(uri) ->
value

{:webrtc, uri, _opts} when is_binary(uri) and direction == :output ->
value

{:whip, uri} when is_binary(uri) ->
parse_opt!(direction, {:whip, uri, []})
parse_endpoint_opt!(direction, {:whip, uri, []})

{:whip, uri, opts} when is_binary(uri) and is_list(opts) ->
if Keyword.keyword?(opts) do
Expand All @@ -208,6 +254,10 @@ defmodule Boombox do
{:hls, location} when direction == :output and is_binary(location) ->
value

{:hls, location, opts}
when direction == :output and is_binary(location) and is_list(opts) ->
value

{:rtsp, location} when direction == :input and is_binary(location) ->
value

Expand All @@ -226,6 +276,24 @@ defmodule Boombox do
end
end

defguardp is_webrtc_endpoint(endpoint)
when is_tuple(endpoint) and elem(endpoint, 0) in [:webrtc, :whip]

@spec maybe_log_transcoding_related_warning(opts_map()) :: :ok
def maybe_log_transcoding_related_warning(opts) do
if is_webrtc_endpoint(opts.output) and not is_webrtc_endpoint(opts.input) and
opts.force_transcoding not in [true, :video] do
Logger.warning("""
Boombox output protocol is WebRTC, while Boombox input doesn't support keyframe requests. This \
might lead to issues with the output video if the output stream isn't sent only by localhost. You \
can solve this by setting `:force_transcoding` output option to `true` or `:video`, but be aware \
that it will increase Boombox CPU usage.
""")
end

:ok
end

@spec consume_stream(Enumerable.t(), opts_map()) :: term()
defp consume_stream(stream, opts) do
procs = start_pipeline(opts)
Expand Down Expand Up @@ -359,4 +427,23 @@ defmodule Boombox do
raise ArgumentError, "Invalid transport: #{inspect(transport)}"
end
end

defp resolve_force_transcoding(opts) do
maybe_keyword =
opts.output
|> Tuple.to_list()
|> List.last()

force_transcoding =
Keyword.keyword?(maybe_keyword) && Keyword.get(maybe_keyword, :force_transcoding, false)

opts
|> Map.put(:force_transcoding, force_transcoding)
|> Map.update!(:output, fn
{:webrtc, signaling, _opts} -> {:webrtc, signaling}
{:hls, location, _opts} -> {:hls, location}
{:mp4, location, _opts} -> {:mp4, location}
other -> other
end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks bad. Why not keep enforce_transcoding like all other options?

end
end
10 changes: 6 additions & 4 deletions lib/boombox/elixir_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Boombox.ElixirStream do
require Membrane.Pad, as: Pad

alias __MODULE__.{Sink, Source}
alias Boombox.Pipeline.Ready
alias Boombox.Pipeline.{Ready, State}
alias Membrane.FFmpeg.SWScale

@options_audio_keys [:audio_format, :audio_rate, :audio_channels]
Expand Down Expand Up @@ -41,9 +41,10 @@ defmodule Boombox.ElixirStream do
consumer :: pid,
options :: Boombox.out_stream_opts(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
Membrane.ChildrenSpec.t(),
State.t()
) :: Ready.t()
def link_output(consumer, options, track_builders, spec_builder) do
def link_output(consumer, options, track_builders, spec_builder, state) do
options = parse_options(options, :output)

{track_builders, to_ignore} =
Expand All @@ -66,7 +67,8 @@ defmodule Boombox.ElixirStream do
{:video, builder} ->
builder
|> child(:elixir_stream_video_transcoder, %Membrane.Transcoder{
output_stream_format: Membrane.RawVideo
output_stream_format: Membrane.RawVideo,
force_transcoding?: state.force_transcoding in [true, :video]
})
|> child(:elixir_stream_rgb_converter, %SWScale.Converter{
format: :RGB,
Expand Down
13 changes: 8 additions & 5 deletions lib/boombox/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ defmodule Boombox.HLS do
import Membrane.ChildrenSpec

require Membrane.Pad, as: Pad
alias Boombox.Pipeline.Ready
alias Boombox.Pipeline.{Ready, State}
alias Membrane.H264
alias Membrane.Time

@spec link_output(
Path.t(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
Membrane.ChildrenSpec.t(),
State.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, track_builders, spec_builder, state) do
{directory, manifest_name} =
if Path.extname(location) == ".m3u8" do
{Path.dirname(location), Path.basename(location, ".m3u8")}
Expand Down Expand Up @@ -44,7 +45,8 @@ defmodule Boombox.HLS do
{:audio, builder} ->
builder
|> child(:hls_audio_transcoder, %Membrane.Transcoder{
output_stream_format: Membrane.AAC
output_stream_format: Membrane.AAC,
force_transcoding?: state.force_transcoding in [true, :audio]
})
|> via_in(Pad.ref(:input, :audio),
options: [encoding: :AAC, segment_duration: Time.milliseconds(2000)]
Expand All @@ -54,7 +56,8 @@ defmodule Boombox.HLS do
{:video, builder} ->
builder
|> child(:hls_video_transcoder, %Membrane.Transcoder{
output_stream_format: %H264{alignment: :au, stream_structure: :avc3}
output_stream_format: %H264{alignment: :au, stream_structure: :avc3},
force_transcoding?: state.force_transcoding in [true, :video]
})
|> via_in(Pad.ref(:input, :video),
options: [encoding: :H264, segment_duration: Time.milliseconds(2000)]
Expand Down
13 changes: 8 additions & 5 deletions lib/boombox/mp4.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Boombox.MP4 do

import Membrane.ChildrenSpec
require Membrane.Pad, as: Pad
alias Boombox.Pipeline.{Ready, Wait}
alias Boombox.Pipeline.{Ready, State, Wait}
alias Membrane.H264
alias Membrane.H265

Expand Down Expand Up @@ -54,9 +54,10 @@ defmodule Boombox.MP4 do
@spec link_output(
String.t(),
Boombox.Pipeline.track_builders(),
Membrane.ChildrenSpec.t()
Membrane.ChildrenSpec.t(),
State.t()
) :: Ready.t()
def link_output(location, track_builders, spec_builder) do
def link_output(location, track_builders, spec_builder, state) do
spec =
[
spec_builder,
Expand All @@ -66,7 +67,8 @@ defmodule Boombox.MP4 do
{:audio, builder} ->
builder
|> child(:mp4_audio_transcoder, %Membrane.Transcoder{
output_stream_format: Membrane.AAC
output_stream_format: Membrane.AAC,
force_transcoding?: state.force_transcoding in [true, :audio]
})
|> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
Expand All @@ -90,7 +92,8 @@ defmodule Boombox.MP4 do

_not_h26x ->
%H264{stream_structure: :avc3, alignment: :au}
end
end,
force_transcoding?: state.force_transcoding in [true, :video]
})
|> via_in(Pad.ref(:input, :video))
|> get_child(:mp4_muxer)
Expand Down
26 changes: 17 additions & 9 deletions lib/boombox/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ defmodule Boombox.Pipeline do
defmodule State do
@moduledoc false

@enforce_keys [:status, :input, :output, :parent]
@enforce_keys [
:status,
:input,
:output,
:parent,
:force_transcoding
]

defstruct @enforce_keys ++
[
Expand Down Expand Up @@ -95,7 +101,8 @@ defmodule Boombox.Pipeline do
eos_info: term(),
rtsp_state: Boombox.RTSP.state() | nil,
parent: pid(),
output_webrtc_state: Boombox.WebRTC.output_webrtc_state() | nil
output_webrtc_state: Boombox.WebRTC.output_webrtc_state() | nil,
force_transcoding: Boombox.force_transcoding_value()
}
end

Expand All @@ -105,6 +112,7 @@ defmodule Boombox.Pipeline do
input: opts.input,
output: opts.output,
parent: opts.parent,
force_transcoding: opts.force_transcoding,
status: :init
}

Expand Down Expand Up @@ -356,20 +364,20 @@ defmodule Boombox.Pipeline do
Boombox.WebRTC.link_output(track_builders, spec_builder, tracks, state)
end

defp link_output({:mp4, location}, track_builders, spec_builder, _ctx, _state) do
Boombox.MP4.link_output(location, track_builders, spec_builder)
defp link_output({:mp4, location}, track_builders, spec_builder, _ctx, state) do
Boombox.MP4.link_output(location, track_builders, spec_builder, state)
end

defp link_output({:hls, location}, track_builders, spec_builder, _ctx, _state) do
Boombox.HLS.link_output(location, track_builders, spec_builder)
defp link_output({:hls, location}, track_builders, spec_builder, _ctx, state) do
Boombox.HLS.link_output(location, track_builders, spec_builder, state)
end

defp link_output({:rtp, opts}, track_builders, spec_builder, _ctx, _state) do
Boombox.RTP.link_output(opts, track_builders, spec_builder)
defp link_output({:rtp, opts}, track_builders, spec_builder, _ctx, state) do
Boombox.RTP.link_output(opts, track_builders, spec_builder, state)
end

defp link_output({:stream, opts}, track_builders, spec_builder, _ctx, state) do
Boombox.ElixirStream.link_output(state.parent, opts, track_builders, spec_builder)
Boombox.ElixirStream.link_output(state.parent, opts, track_builders, spec_builder, state)
end

# Wait between sending the last packet
Expand Down
Loading