From 69fcdddbca4207c624185df999a182d274c24af1 Mon Sep 17 00:00:00 2001 From: Daniel Morandini Date: Mon, 30 Oct 2023 15:31:40 +0100 Subject: [PATCH] Remove in memory demuxer --- c_src/decoder.h | 5 +- c_src/demuxer.c | 282 +----------------------------- c_src/demuxer.h | 12 +- c_src/libav.c | 69 +------- lib/avx/demuxer.ex | 158 +++-------------- lib/avx/demuxer/mailbox_reader.ex | 75 -------- lib/avx/nif.ex | 16 +- mix.exs | 3 +- mix.lock | 2 + test/avx/decoder_test.exs | 4 +- test/avx/demuxer_test.exs | 53 +----- 11 files changed, 56 insertions(+), 623 deletions(-) delete mode 100644 lib/avx/demuxer/mailbox_reader.ex diff --git a/c_src/decoder.h b/c_src/decoder.h index 9de256b..153a947 100644 --- a/c_src/decoder.h +++ b/c_src/decoder.h @@ -1,12 +1,11 @@ #include "libswresample/swresample.h" #include -typedef struct Decoder Decoder; -struct Decoder { +typedef struct { AVCodecContext *codec_ctx; SwrContext *resampler_ctx; enum AVSampleFormat output_sample_format; -}; +} Decoder; int decoder_alloc(Decoder **ctx, enum AVCodecID codec_id, AVCodecParameters *params, AVRational timebase); diff --git a/c_src/demuxer.c b/c_src/demuxer.c index 6894fed..fb3bdfd 100644 --- a/c_src/demuxer.c +++ b/c_src/demuxer.c @@ -1,16 +1,10 @@ -#include "libavformat/avformat.h" #include -#include // File-based demuxer. -typedef struct { +int demuxer_alloc_from_file(Demuxer **ctx, char *path) { AVFormatContext *fmt_ctx; -} DemuxerFile; - -int demuxer_file_alloc(DemuxerFile **ctx, char *path) { - AVFormatContext *fmt_ctx; - DemuxerFile *demuxer; + Demuxer *demuxer; int errn; fmt_ctx = avformat_alloc_context(); @@ -20,7 +14,7 @@ int demuxer_file_alloc(DemuxerFile **ctx, char *path) { if ((errn = avformat_find_stream_info(fmt_ctx, NULL)) < 0) goto fail; - demuxer = (DemuxerFile *)malloc(sizeof(DemuxerFile)); + demuxer = (Demuxer *)malloc(sizeof(Demuxer)); demuxer->fmt_ctx = fmt_ctx; *ctx = demuxer; return 0; @@ -31,278 +25,12 @@ int demuxer_file_alloc(DemuxerFile **ctx, char *path) { return errn; } -int demuxer_file_read_packet(void *ctx, AVPacket *packet) { - return av_read_frame(((DemuxerFile *)ctx)->fmt_ctx, packet); -} - -void demuxer_file_fmt_ctx(void *opaque, AVFormatContext **fmt_ctx) { - DemuxerFile *ctx = (DemuxerFile *)opaque; - *fmt_ctx = ctx->fmt_ctx; -} - -int default_add(void *ctx, void *data, int size) { return -1; } -int default_demand(void *ctx) { return -1; } -int default_is_ready(void *ctx) { return 1; } - -void demuxer_file_free(void **ctx) { - DemuxerFile **demuxer = (DemuxerFile **)ctx; - avformat_close_input(&(*demuxer)->fmt_ctx); - free(*ctx); - *ctx = NULL; -} - -// @warning In memory demuxer, experimental. - -typedef enum { CTX_MODE_DRAIN, CTX_MODE_BUF } CTX_MODE; - -typedef struct { - // Stores incoming bytes. Allows rewinding. - Ioq *queue; - // The context responsible for reading data from the queue. It is - // configured to use the read_packet function as source. - AVIOContext *io_ctx; - // The actual libAV demuxer. - AVFormatContext *fmt_ctx; - - CTX_MODE mode; - - int has_header; -} DemuxerMem; - -int demuxer_mem_alloc(DemuxerMem **demuxer, int probe_size) { - DemuxerMem *ctx; - - Ioq *queue = (Ioq *)malloc(sizeof(Ioq)); - queue->ptr = malloc(probe_size); - queue->mode = QUEUE_MODE_GROW; - queue->input_eos = 0; - queue->size = probe_size; - queue->pos = 0; - queue->buf_end = 0; - - ctx = (DemuxerMem *)malloc(sizeof(DemuxerMem)); - ctx->queue = queue; - ctx->mode = CTX_MODE_BUF; - ctx->has_header = 0; - *demuxer = ctx; - - return 0; -} - -int read_ioq(void *opaque, uint8_t *buf, int buf_size) { - int size; - Ioq *queue; - - queue = (Ioq *)opaque; - - if ((size = queue_read(queue, buf, buf_size)) < 0) { - if (queue->input_eos) - return AVERROR_EOF; - // Avoid telling avio that the input is finished if - // we're the input is not. - return 0; - } - - return size; -} - -void demuxer_mem_fmt_ctx(void *opaque, AVFormatContext **fmt_ctx) { - DemuxerMem *ctx = (DemuxerMem *)opaque; - *fmt_ctx = ctx->fmt_ctx; -} - -int demuxer_mem_read_header(DemuxerMem *ctx) { - AVIOContext *io_ctx; - AVFormatContext *fmt_ctx; - void *io_buffer; - int errnum; - int ret; - - // TODO can we avoid allocating this buffer each time we do a read attempt? - io_buffer = av_malloc(ctx->queue->size); - // Context that reads from queue and uses io_buffer as scratch space. - io_ctx = avio_alloc_context(io_buffer, ctx->queue->size, 0, ctx->queue, - &read_ioq, NULL, NULL); - io_ctx->seekable = 0; - - fmt_ctx = avformat_alloc_context(); - fmt_ctx->pb = io_ctx; - fmt_ctx->probesize = ctx->queue->size; - - if ((errnum = avformat_open_input(&fmt_ctx, NULL, NULL, NULL))) - goto open_error; - - if ((errnum = avformat_find_stream_info(fmt_ctx, NULL)) < 0) - goto open_error; - - ctx->has_header = 1; - ctx->io_ctx = io_ctx; - ctx->fmt_ctx = fmt_ctx; - - // From now on the queue will stop growing, it will rather - // delete data once it is read as we do not need to read - // it again from the beginning, we found the header! - queue_deq(ctx->queue); - ctx->queue->mode = QUEUE_MODE_SHIFT; - - return errnum; - -open_error: - avio_context_free(&io_ctx); - avformat_close_input(&fmt_ctx); - - queue_grow(ctx->queue, 2); - ctx->queue->pos = 0; - - return errnum; -} - -int demuxer_mem_add_data(void *ctx, void *data, int size) { - DemuxerMem *demuxer = (DemuxerMem *)ctx; - - // Indicates EOS. - if (!data) { - demuxer->mode = CTX_MODE_DRAIN; - demuxer->queue->input_eos = 1; - return 0; - } - - // Copy the data in the buffer. - queue_copy(demuxer->queue, data, size); - - // Make an attemp reading the header only when the ioq buffer is filled. - if (!demuxer->has_header && queue_is_filled(demuxer->queue)) - demuxer_mem_read_header(ctx); - - return 0; -} - -int demuxer_mem_is_ready(void *opaque) { - return ((DemuxerMem *)opaque)->has_header; -} - -int demuxer_mem_read_streams(void *opaque, AVStream **streams, - unsigned int *size) { - int errn; - DemuxerMem *ctx = (DemuxerMem *)opaque; - - // Called on EOS: we're not ready, meaning that we did not get - // the amount of data we wanted, but we may still be able to - // obtain the streams. - if (!ctx->has_header) { - if ((errn = demuxer_mem_read_header(ctx)) != 0) { - return errn; - } - } - - *size = (int)ctx->fmt_ctx->nb_streams; - *streams = *ctx->fmt_ctx->streams; - return 0; -} - -int demuxer_mem_read_packet(void *opaque, AVPacket *packet) { - int errn, freespace; - DemuxerMem *ctx; - - ctx = (DemuxerMem *)opaque; - freespace = queue_freespace(ctx->queue); - - if (freespace > 0 && ctx->mode == CTX_MODE_BUF) - // When we're not draining the demuxer, try to - // keep the ioq filled with data. - return freespace; - - return av_read_frame(ctx->fmt_ctx, packet); -} - -int demuxer_mem_demand(void *opaque) { - return queue_freespace(((DemuxerMem *)opaque)->queue); -} - -void demuxer_mem_free(void **opaque) { - DemuxerMem **ctx = (DemuxerMem **)opaque; - - free((*ctx)->queue->ptr); - avio_context_free(&(*ctx)->io_ctx); - avformat_close_input(&(*ctx)->fmt_ctx); - free(*ctx); - *opaque = NULL; -} - -// Public API. - -struct Demuxer { - void *opaque; - - int (*read_packet)(void *, AVPacket *); - int (*add_data)(void *, void *, int); - int (*read_streams)(void *, AVStream ***, unsigned int *); - int (*is_ready)(void *); - int (*demand)(void *); - void (*fmt_ctx)(void *, AVFormatContext **); - void (*free)(void **); -}; - -int demuxer_alloc_from_file(Demuxer **demuxer, char *path) { - DemuxerFile *ctx; - Demuxer *idemuxer; - int errn; - - if ((errn = demuxer_file_alloc(&ctx, path)) < 0) - return errn; - - idemuxer = (Demuxer *)malloc(sizeof(Demuxer)); - idemuxer->opaque = ctx; - idemuxer->read_packet = &demuxer_file_read_packet; - idemuxer->add_data = &default_add; - idemuxer->fmt_ctx = &demuxer_file_fmt_ctx; - idemuxer->is_ready = &default_is_ready; - idemuxer->free = &demuxer_file_free; - idemuxer->demand = &default_demand; - - *demuxer = idemuxer; - return 0; -} - -int demuxer_alloc_in_mem(Demuxer **demuxer, int probe_size) { - DemuxerMem *ctx; - Demuxer *idemuxer; - int errn; - - if ((errn = demuxer_mem_alloc(&ctx, probe_size)) < 0) - return errn; - - idemuxer = (Demuxer *)malloc(sizeof(Demuxer)); - idemuxer->opaque = (void *)ctx; - idemuxer->read_packet = &demuxer_mem_read_packet; - idemuxer->add_data = &demuxer_mem_add_data; - idemuxer->fmt_ctx = &demuxer_mem_fmt_ctx; - idemuxer->is_ready = &demuxer_mem_is_ready; - idemuxer->free = &demuxer_mem_free; - idemuxer->demand = &demuxer_mem_demand; - - *demuxer = idemuxer; - return 0; -} - int demuxer_read_packet(Demuxer *ctx, AVPacket *packet) { - return ctx->read_packet(ctx->opaque, packet); -} - -int demuxer_add_data(Demuxer *ctx, void *data, int size) { - return ctx->add_data(ctx->opaque, data, size); -} - -int demuxer_demand(Demuxer *ctx) { return ctx->demand(ctx->opaque); } - -int demuxer_is_ready(Demuxer *ctx) { return ctx->is_ready(ctx->opaque); } - -void demuxer_fmt_ctx(Demuxer *ctx, AVFormatContext **fmt_ctx) { - ctx->fmt_ctx(ctx->opaque, fmt_ctx); + return av_read_frame(ctx->fmt_ctx, packet); } void demuxer_free(Demuxer **ctx) { - (*ctx)->free(&(*ctx)->opaque); + avformat_close_input(&(*ctx)->fmt_ctx); free(*ctx); *ctx = NULL; } diff --git a/c_src/demuxer.h b/c_src/demuxer.h index c0e4061..2f35c22 100644 --- a/c_src/demuxer.h +++ b/c_src/demuxer.h @@ -1,16 +1,12 @@ #include -typedef struct Demuxer Demuxer; +typedef struct { + AVFormatContext *fmt_ctx; +} Demuxer; -int demuxer_alloc_from_file(Demuxer **demuxer, char *path); -int demuxer_alloc_in_mem(Demuxer **demuxer, int probe_size); +int demuxer_alloc_from_file(Demuxer **ctx, char *path); // @doc Returns < 0 in case of error, 0 on success, > 1 to // indicate the amount of data should be supplied (i.e. demand) int demuxer_read_packet(Demuxer *ctx, AVPacket *packet); -int demuxer_add_data(Demuxer *ctx, void *data, int size); -int demuxer_demand(Demuxer *ctx); -int demuxer_is_ready(Demuxer *ctx); - -void demuxer_fmt_ctx(Demuxer *ctx, AVFormatContext **fmt_ctx); void demuxer_free(Demuxer **ctx); diff --git a/c_src/libav.c b/c_src/libav.c index 658cf64..6c504ab 100644 --- a/c_src/libav.c +++ b/c_src/libav.c @@ -78,45 +78,9 @@ ERL_NIF_TERM enif_demuxer_alloc_from_file(ErlNifEnv *env, int argc, if (errn < 0) return enif_make_av_error(env, errn); - return enif_make_demuxer_res(env, ctx); -} - -ERL_NIF_TERM enif_demuxer_alloc_in_mem(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - int probe_size, errn; - Demuxer *ctx; - - enif_get_int(env, argv[0], &probe_size); - if (probe_size <= 0) - probe_size = DEFAULT_PROBE_SIZE; - if ((errn = demuxer_alloc_in_mem(&ctx, probe_size)) < 0) - return enif_make_av_error(env, errn); - - return enif_make_demuxer_res(env, ctx); -} - -ERL_NIF_TERM enif_demuxer_add_data(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - Demuxer *ctx; - ErlNifBinary binary; - int errn; - - enif_get_demuxer(env, argv[0], &ctx); - enif_inspect_binary(env, argv[1], &binary); - - if ((errn = demuxer_add_data(ctx, binary.data, binary.size)) < 0) - return enif_make_error(env, "add data not allowed"); - - return enif_make_atom(env, "ok"); -} - -ERL_NIF_TERM enif_demuxer_is_ready(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - Demuxer *ctx; - enif_get_demuxer(env, argv[0], &ctx); - - return enif_make_int(env, demuxer_is_ready(ctx)); + return enif_make_tuple2(env, enif_make_atom(env, "ok"), + enif_make_demuxer_res(env, ctx)); } ERL_NIF_TERM enif_demuxer_read_packet(ErlNifEnv *env, int argc, @@ -132,11 +96,8 @@ ERL_NIF_TERM enif_demuxer_read_packet(ErlNifEnv *env, int argc, if ((ret = demuxer_read_packet(ctx, packet)) != 0) { if (ret == AVERROR_EOF) return enif_make_atom(env, "eof"); - if (ret < 0) + else return enif_make_av_error(env, ret); - if (ret > 0) - return enif_make_tuple2(env, enif_make_atom(env, "demand"), - enif_make_long(env, ret)); } // Make the resource @@ -217,23 +178,21 @@ ERL_NIF_TERM enif_make_stream_map(ErlNifEnv *env, AVStream *stream) { return map; } -ERL_NIF_TERM enif_demuxer_read_streams(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { +ERL_NIF_TERM enif_demuxer_streams(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { ERL_NIF_TERM *codecs; Demuxer *ctx; AVFormatContext *fmt_ctx; enif_get_demuxer(env, argv[0], &ctx); - demuxer_fmt_ctx(ctx, &fmt_ctx); + fmt_ctx = ctx->fmt_ctx; codecs = calloc(fmt_ctx->nb_streams, sizeof(ERL_NIF_TERM)); for (int i = 0; i < fmt_ctx->nb_streams; i++) { codecs[i] = enif_make_stream_map(env, fmt_ctx->streams[i]); } - return enif_make_tuple2( - env, enif_make_atom(env, "ok"), - enif_make_list_from_array(env, codecs, fmt_ctx->nb_streams)); + return enif_make_list_from_array(env, codecs, fmt_ctx->nb_streams); } int enif_get_packet(ErlNifEnv *env, ERL_NIF_TERM term, AVPacket **packet) { @@ -266,14 +225,6 @@ ERL_NIF_TERM enif_packet_unpack(ErlNifEnv *env, int argc, return map; } -ERL_NIF_TERM enif_demuxer_demand(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - Demuxer *ctx; - enif_get_demuxer(env, argv[0], &ctx); - - return enif_make_int(env, demuxer_demand(ctx)); -} - void enif_free_decoder(ErlNifEnv *env, void *res) { Decoder **ctx = (Decoder **)res; decoder_free(ctx); @@ -500,12 +451,8 @@ static ErlNifFunc nif_funcs[] = { // {erl_function_name, erl_function_arity, c_function} // Demuxer {"demuxer_alloc_from_file", 1, enif_demuxer_alloc_from_file}, - {"demuxer_alloc", 1, enif_demuxer_alloc_in_mem}, - {"demuxer_streams", 1, enif_demuxer_read_streams}, + {"demuxer_streams", 1, enif_demuxer_streams}, {"demuxer_read_packet", 1, enif_demuxer_read_packet}, - {"demuxer_add_data", 2, enif_demuxer_add_data}, - {"demuxer_is_ready", 1, enif_demuxer_is_ready}, - {"demuxer_demand", 1, enif_demuxer_demand}, // Decoder {"decoder_alloc", 1, enif_decoder_alloc}, {"decoder_stream_format", 1, enif_decoder_stream_format}, diff --git a/lib/avx/demuxer.ex b/lib/avx/demuxer.ex index 880bac7..38273d2 100644 --- a/lib/avx/demuxer.ex +++ b/lib/avx/demuxer.ex @@ -2,19 +2,6 @@ defmodule AVx.Demuxer do alias AVx.{NIF, Packet} require Logger - @default_probe_size 2048 - - @type read :: (any(), size :: pos_integer -> {:eof | iodata(), any()}) - @type close :: (any() -> :ok) - @type input :: any() - - @type reader :: %{ - read: read(), - close: close(), - opaque: any(), - probe_size: non_neg_integer() - } - @type codec_type :: :audio | :video @type stream :: %{ @@ -29,60 +16,39 @@ defmodule AVx.Demuxer do @type t :: %__MODULE__{ demuxer: reference(), - reader: reader() | nil, - file_path: Path.t() | nil, + uri: URI.t(), eof: map() } - defstruct [:demuxer, :reader, :file_path, eof: %{input: false, demuxer: false}] - - @spec new_in_memory(reader()) :: t() - @doc """ - Allocates a demuxer that reads that from the provided reader instead - of a file. For a reader implementation, check the MailboxReader. - - WARNING: this is an experimental feature - """ - def new_in_memory(reader) do - probe_size = Map.get(reader, :probe_size, @default_probe_size) - %__MODULE__{demuxer: NIF.demuxer_alloc(probe_size), reader: reader} - end + defstruct [:demuxer, :uri, eof: %{input: false, demuxer: false}] - def new_in_memory_from_file(path) do - new_in_memory(%{ - opaque: File.open!(path, [:raw, :read]), - read: fn input, size -> - resp = IO.binread(input, size) - {resp, input} - end, - close: fn input -> File.close(input) end - }) - end - - @spec new_from_file(Path.t()) :: t() - def new_from_file(path) do - %__MODULE__{demuxer: NIF.demuxer_alloc_from_file(path), file_path: path} + @spec new_from_file(String.t()) :: {:ok, t()} | {:error, any()} + def new_from_file(uri) do + case NIF.demuxer_alloc_from_file(uri) do + {:ok, demuxer} -> {:ok, %__MODULE__{demuxer: demuxer, uri: uri}} + {:error, reason} -> {:error, to_string(reason)} + end end @doc """ - Consumes the input until the header is parsed. Then, the available streams are returned. + Opens and starts reading the input file finding the streams + contained in it. """ - @spec streams(t()) :: {[stream()], t()} - def streams(state = %{reader: reader}) when reader != nil do - if is_ready(state) or state.eof.input do - read_streams(state) - else - state - |> read_input() - |> streams() - end - end - - def streams(state) do - read_streams(state) - end - - defp is_ready(state) do - NIF.demuxer_is_ready(state.demuxer) != 0 + @spec read_streams(t()) :: [stream()] + def read_streams(state) do + state.demuxer + |> NIF.demuxer_streams() + |> Enum.map(fn stream -> + # Coming from erlang, strings are charlists. + stream + |> Enum.map(fn {key, value} -> + if is_list(value) do + {key, to_string(value)} + else + {key, value} + end + end) + |> Map.new() + end) end @doc """ @@ -91,7 +57,7 @@ defmodule AVx.Demuxer do Returns an enumerable of AVx.Packet. """ - @spec consume_packets(t(), [pos_integer()]) :: Enumerable.t() + @spec consume_packets(t(), [pos_integer()]) :: Stream.t() def consume_packets(state, stream_indexes) when is_list(stream_indexes) do # -1 is used as an indicator for the last packet, which must # be delivered to the decoder to put it into drain mode. @@ -100,38 +66,6 @@ defmodule AVx.Demuxer do do_consume_packets(state, accepted_streams) end - defp do_consume_packets(state = %{reader: reader}, stream_indexes) - when reader != nil and is_list(stream_indexes) do - # TODO error handling? - Stream.resource( - fn -> state end, - fn - state = %__MODULE__{eof: %{demuxer: true}} -> - {:halt, state} - - state -> - case NIF.demuxer_read_packet(state.demuxer) do - {:error, reason} -> - Logger.error("Demuxer read packet failed: #{inspect(to_string(reason))}") - {[AVx.Packet.new(nil)], %{state | eof: %{state.eof | demuxer: true}}} - - :eof -> - {[AVx.Packet.new(nil)], %{state | eof: %{state.eof | demuxer: true}}} - - {:demand, demand} -> - {[], read_input(state, demand)} - - {:ok, ref} -> - {[AVx.Packet.new(ref)], state} - end - end, - fn - state -> state.reader.close.(state.reader.opaque) - end - ) - |> filter_packets(stream_indexes) - end - defp do_consume_packets(state, stream_indexes) do Stream.resource( fn -> state end, @@ -140,8 +74,6 @@ defmodule AVx.Demuxer do {:halt, state} state -> - # There is no demand when the demuxer is taking care of - # the input internally. case NIF.demuxer_read_packet(state.demuxer) do {:error, reason} -> Logger.error("Demuxer read packet failed: #{inspect(to_string(reason))}") @@ -171,40 +103,4 @@ defmodule AVx.Demuxer do |> Stream.map(&Packet.unpack/1) |> Stream.map(fn packet -> packet.data end) end - - defp read_input(state, demand \\ nil) do - demand = if demand != nil, do: demand, else: NIF.demuxer_demand(state.demuxer) - - case state.reader.read.(state.reader.opaque, demand) do - {:eof, opaque} -> - NIF.demuxer_add_data(state.demuxer, nil) - %{state | reader: %{state.reader | opaque: opaque}, eof: %{state.eof | input: true}} - - {data, opaque} -> - NIF.demuxer_add_data(state.demuxer, data) - %{state | reader: %{state.reader | opaque: opaque}} - end - end - - defp read_streams(state) do - streams = - state.demuxer - |> NIF.demuxer_streams() - # TODO error handling? - |> elem(1) - |> Enum.map(fn stream -> - # Coming from erlang, strings are charlists. - stream - |> Enum.map(fn {key, value} -> - if is_list(value) do - {key, to_string(value)} - else - {key, value} - end - end) - |> Map.new() - end) - - {streams, state} - end end diff --git a/lib/avx/demuxer/mailbox_reader.ex b/lib/avx/demuxer/mailbox_reader.ex deleted file mode 100644 index eb8c241..0000000 --- a/lib/avx/demuxer/mailbox_reader.ex +++ /dev/null @@ -1,75 +0,0 @@ -defmodule AVx.Demuxer.MailboxReader do - use GenServer - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, [], opts) - end - - def read(server, size) do - # Blocks util a response is received. - case GenServer.call(server, {:read, size}, :infinity) do - {:ok, data} -> - {data, server} - - :eof -> - {:eof, server} - end - end - - @doc "Add data to the reader" - @spec add_data(pid(), binary() | nil) :: :ok - def add_data(pid, data) do - send(pid, {:data, data}) - end - - def close(_pid) do - :ok - end - - @impl GenServer - def init(_opts) do - {:ok, %{pending: nil, buffer: <<>>, recv: 0, sent: 0, eof: false}} - end - - @impl true - def handle_call({:read, size}, from, state) do - cond do - state.eof and byte_size(state.buffer) == 0 -> - {:stop, :normal, :eof, state} - - byte_size(state.buffer) == 0 -> - {:noreply, %{state | pending: {from, size}}} - - true -> - {buffer, state} = read_buffer(state, size) - {:reply, {:ok, buffer}, state} - end - end - - @impl GenServer - def handle_info({:data, nil}, state) do - {:noreply, %{state | eof: true}} - end - - def handle_info({:data, data}, state = %{pending: nil}) do - {:noreply, %{state | buffer: state.buffer <> data, recv: state.recv + byte_size(data)}} - end - - def handle_info({:data, data}, state = %{pending: {from, size}}) do - state = %{state | buffer: state.buffer <> data} - {buffer, state} = read_buffer(state, size) - GenServer.reply(from, {:ok, buffer}) - - {:noreply, state} - end - - defp read_buffer(state, size) do - if byte_size(state.buffer) >= size do - <> = state.buffer - {buf, %{state | pending: nil, buffer: rest, sent: state.sent + byte_size(buf)}} - else - {state.buffer, - %{state | pending: nil, buffer: <<>>, sent: state.sent + byte_size(state.buffer)}} - end - end -end diff --git a/lib/avx/nif.ex b/lib/avx/nif.ex index 5e405e6..964b956 100644 --- a/lib/avx/nif.ex +++ b/lib/avx/nif.ex @@ -6,30 +6,18 @@ defmodule AVx.NIF do :erlang.load_nif(path, 0) end - def demuxer_alloc(_probe_size) do - raise "NIF demuxer_alloc/1 not implemented" - end - def demuxer_alloc_from_file(_path) do raise "NIF demuxer_alloc_from_file/1 not implemented" end - def demuxer_add_data(_ctx, _data) do - raise "NIF demuxer_add_data/2 not implemented" + def demuxer_read_header(_ctx) do + raise "NIF demuxer_read_header/1 not implemented" end def demuxer_streams(_ctx) do raise "NIF demuxer_streams/1 not implemented" end - def demuxer_is_ready(_ctx) do - raise "NIF demuxer_is_ready/1 not implemented" - end - - def demuxer_demand(_ctx) do - raise "NIF demuxer_demand/1 not implemented" - end - def demuxer_read_packet(_ctx) do raise "NIF demuxer_read_packet/1 not implemented" end diff --git a/mix.exs b/mix.exs index 7e48815..b6799db 100644 --- a/mix.exs +++ b/mix.exs @@ -26,7 +26,8 @@ defmodule AVx.MixProject do [ {:nimble_options, "~> 1.0.0"}, {:elixir_make, "~> 0.6", runtime: false}, - {:jason, "~> 1.4.1"} + {:jason, "~> 1.4.1"}, + {:thousand_island, "~> 1.1", only: :test} ] end diff --git a/mix.lock b/mix.lock index 8f9af59..07f07b8 100644 --- a/mix.lock +++ b/mix.lock @@ -2,4 +2,6 @@ "elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "thousand_island": {:hex, :thousand_island, "1.1.0", "dcc115650adc61c5e7de12619f0cb94b2b8f050326e7f21ffbf6fdeb3d291e4c", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7745cf71520d74e119827ff32c2da6307e822cf835bebed3b2c459cc57f32d21"}, } diff --git a/test/avx/decoder_test.exs b/test/avx/decoder_test.exs index 24588df..ad5087d 100644 --- a/test/avx/decoder_test.exs +++ b/test/avx/decoder_test.exs @@ -14,8 +14,8 @@ defmodule AVx.DecoderTest do for input <- @inputs do test "extract audio track from #{input}" do - demuxer = Demuxer.new_from_file(unquote(input)) - {streams, demuxer} = Demuxer.streams(demuxer) + {:ok, demuxer} = Demuxer.new_from_file(unquote(input)) + streams = Demuxer.read_streams(demuxer) assert stream = %{codec_type: :audio} = diff --git a/test/avx/demuxer_test.exs b/test/avx/demuxer_test.exs index 8a170c5..2ffc9e6 100644 --- a/test/avx/demuxer_test.exs +++ b/test/avx/demuxer_test.exs @@ -2,62 +2,13 @@ defmodule AVx.DemuxerTest do use ExUnit.Case alias AVx.{Demuxer, Packet} - alias AVx.Demuxer.MailboxReader @input "test/data/mic.mp4" describe "demuxer" do test "from file" do - demuxer = Demuxer.new_from_file(@input) - - {streams, demuxer} = Demuxer.streams(demuxer) - - stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end) - - info = Support.show_packets(@input) - assert_packets(demuxer, [stream.stream_index], info) - end - - test "with bin read from memory" do - demuxer = - Demuxer.new_in_memory(%{ - opaque: File.open!(@input, [:raw, :read]), - read: fn input, size -> - resp = IO.binread(input, size) - {resp, input} - end, - close: fn input -> File.close(input) end - }) - - {streams, demuxer} = Demuxer.streams(demuxer) - - stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end) - - info = Support.show_packets(@input) - assert_packets(demuxer, [stream.stream_index], info) - end - - test "with MailboxReader" do - pid = start_link_supervised!(MailboxReader) - - spawn_link(fn -> - @input - |> File.stream!([:raw, :read], 2048) - |> Enum.map(fn data -> - MailboxReader.add_data(pid, data) - end) - - send(pid, {:data, nil}) - end) - - demuxer = - Demuxer.new_in_memory(%{ - opaque: pid, - read: &MailboxReader.read/2, - close: &MailboxReader.close/1 - }) - - {streams, demuxer} = Demuxer.streams(demuxer) + {:ok, demuxer} = Demuxer.new_from_file(@input) + streams = Demuxer.read_streams(demuxer) stream = Enum.find(streams, fn stream -> stream.codec_type == :audio end)