diff --git a/lib/avro_ex/object_container.ex b/lib/avro_ex/object_container.ex new file mode 100644 index 0000000..e4b5b16 --- /dev/null +++ b/lib/avro_ex/object_container.ex @@ -0,0 +1,204 @@ +defmodule AvroEx.ObjectContainer do + use TypedStruct + + alias AvroEx.{Schema} + + @type codec_types :: :null | :deflate | :bzip2 | :snappy | :xz | :zstandard + + typedstruct do + field :schema, Schema.t() + field :codec, AvroEx.ObjectContainer.Codec, default: AvroEx.ObjectContainer.Codec.Null + field :meta, map(), default: %{} + field :sync, <<_::128>> + end + + @magic <<"Obj", 1>> + @bh_schema AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_header", + "fields" => [ + %{"name" => "num_objects", "type" => "long"}, + %{"name" => "num_bytes", "type" => "long"} + ] + }) + + @fh_schema AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "org.apache.avro.file.Header", + "fields" => [ + %{"name" => "magic", "type" => %{"type" => "fixed", "name" => "Magic", "size" => 4}}, + %{"name" => "meta", "type" => %{"type" => "map", "values" => "bytes"}}, + %{"name" => "sync", "type" => %{"type" => "fixed", "name" => "Sync", "size" => 16}} + ] + }) + + def block_header_schema(), do: @bh_schema + def file_header_schema(), do: @fh_schema + + def new(schema, opts \\ []) do + %__MODULE__{ + schema: schema, + codec: Keyword.get(opts, :codec, AvroEx.ObjectContainer.Codec.Null), + meta: Keyword.get(opts, :meta, %{}), + sync: :rand.bytes(16) + } + end + + def encode_file_header!(%__MODULE__{} = ocf) do + metadata = + %{ + "avro.schema" => AvroEx.encode_schema(ocf.schema), + "avro.codec" => to_string(ocf.codec.name()) + } + |> Map.merge(ocf.meta) + + AvroEx.encode!(@fh_schema, %{ + magic: @magic, + meta: metadata, + sync: ocf.sync + }) + end + + @spec encode_block_header!(pos_integer(), pos_integer()) :: binary() + def encode_block_header!(num_objects, encoded_data_size) do + header = %{"num_objects" => num_objects, "num_bytes" => encoded_data_size} + AvroEx.encode!(@bh_schema, header) + end + + def encode_block_footer!(%__MODULE__{sync: sync}), do: sync + + def encode_block_objects!(%__MODULE__{} = ocf, objects) do + for obj <- objects, reduce: <<>> do + acc -> acc <> AvroEx.encode!(ocf.schema, obj) + end + |> ocf.codec.encode!() + end + + def encode_block!(%__MODULE__{} = ocf, objects) do + data = encode_block_objects!(ocf, objects) + encode_block_header!(length(objects), byte_size(data)) <> data <> encode_block_footer!(ocf) + end + + def encode_file!(%__MODULE__{} = ocf, objects) do + encode_file_header!(ocf) <> encode_block!(ocf, objects) + end + + defp check_magic(<<"Obj", 1, _::binary>>), do: :ok + defp check_magic(_), do: {:error, %AvroEx.DecodeError{message: "Invalid file header"}} + + defp decode_with_rest(schema, message, opts \\ []) do + try do + AvroEx.Decode.decode(schema, message, opts) + rescue + e in MatchError -> {:error, e} + end + end + + defp get_schema(%{"avro.schema" => schema}), do: {:ok, schema} + defp get_schema(_), do: {:error, %AvroEx.DecodeError{message: "Invalid or missing schema in file header"}} + defp get_codec(%{"avro.codec" => codec}), do: {:ok, codec} + defp get_codec(_), do: {:ok, :null} + + @spec decode_file_header(binary(), keyword()) :: + {:ok, AvroEx.ObjectContainer.t(), binary()} | {:error, AvroEx.DecodeError.t()} + def decode_file_header(file_header, opts \\ []) do + user_codecs = Keyword.get(opts, :codecs, []) + + with :ok <- check_magic(file_header), + {:ok, decoded_header, rest} <- decode_with_rest(@fh_schema, file_header), + {:ok, schema} <- get_schema(decoded_header["meta"]), + {:ok, codec} <- get_codec(decoded_header["meta"]), + {:ok, decoded_schema} <- AvroEx.decode_schema(schema), + {:ok, codec_impl} <- __MODULE__.Codec.get_codec_implementation(codec, user_codecs) do + meta = Map.drop(decoded_header["meta"], ["avro.schema", "avro.codec"]) + + {:ok, + %__MODULE__{ + schema: decoded_schema, + codec: codec_impl, + meta: meta, + sync: decoded_header["sync"] + }, rest} + end + end + + defp check_block_header(%{"num_objects" => num_objects, "num_bytes" => num_bytes}) + when is_integer(num_objects) and num_objects >= 0 and is_integer(num_bytes) and num_bytes >= 0, + do: {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} + + defp check_block_header(%{num_objects: num_objects, num_bytes: num_bytes}) + when is_integer(num_objects) and num_objects >= 0 and is_integer(num_bytes) and num_bytes >= 0, + do: {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} + + defp check_block_header(_), do: {:error, %AvroEx.DecodeError{message: "Invalid block header"}} + + def decode_block_header(data) do + with {:ok, header, rest} <- decode_with_rest(@bh_schema, data), + {:ok, checked_header} <- check_block_header(header) do + {:ok, checked_header, rest} + end + end + + def check_block_footer(%__MODULE__{sync: sync}, <>) when sync == <>, + do: {:ok, rest} + + def check_block_footer(%__MODULE__{sync: sync}, <>), + do: {:error, %AvroEx.DecodeError{message: "Invalid sync bytes: #{inspect(sync)} != #{inspect(read_sync)}"}} + + defp do_decode_block_objects(file_header, data, objects \\ []) + defp do_decode_block_objects(_file_header, <<>>, objects), do: {:ok, Enum.reverse(objects)} + + defp do_decode_block_objects(%__MODULE__{} = file_header, data, objects) do + with {:ok, object, rest} <- decode_with_rest(file_header.schema, data) do + do_decode_block_objects(file_header, rest, [object | objects]) + end + end + + defp get_object_data(num_bytes, data) do + with <> <- data do + {:ok, object_data, rest} + else + _ -> {:error, %AvroEx.DecodeError{message: "Not enough bytes for block objects"}} + end + end + + defp check_num_objects(objects, num_objects) when length(objects) == num_objects, do: :ok + defp check_num_objects(_, _), do: {:error, %AvroEx.DecodeError{message: "Invalid number of objects"}} + + def decode_block_objects(file_header, block_header, data) do + with {:ok, %{num_objects: num_objects, num_bytes: num_bytes}} <- check_block_header(block_header), + {:ok, object_data, rest} <- get_object_data(num_bytes, data), + {:ok, objects} <- do_decode_block_objects(file_header, object_data), + :ok <- check_num_objects(objects, num_objects) do + {:ok, objects, rest} + end + end + + def decode_block(file_header, data) do + with {:ok, block_header, rest} <- decode_block_header(data), + {:ok, objects, rest} <- decode_block_objects(file_header, block_header, rest), + {:ok, rest} <- check_block_footer(file_header, rest) do + {:ok, objects, rest} + end + end + + defp do_decode_blocks(file_header, data, objects \\ []) + defp do_decode_blocks(_file_header, <<>>, objects), do: {:ok, objects |> Enum.reverse() |> List.flatten()} + + defp do_decode_blocks(file_header, data, objects) do + with {:ok, new_objects, rest} <- decode_block(file_header, data) do + do_decode_blocks(file_header, rest, [new_objects | objects]) + end + end + + def decode_blocks(file_header, data) do + do_decode_blocks(file_header, data) + end + + def decode_file(data, opts \\ []) do + with {:ok, %__MODULE__{} = fileheader, rest} <- decode_file_header(data, opts), + {:ok, objects} <- decode_blocks(fileheader, rest) do + {:ok, fileheader, objects} + end + end +end diff --git a/lib/avro_ex/object_container/codec.ex b/lib/avro_ex/object_container/codec.ex new file mode 100644 index 0000000..8ec24c1 --- /dev/null +++ b/lib/avro_ex/object_container/codec.ex @@ -0,0 +1,26 @@ +defmodule AvroEx.ObjectContainer.Codec do + @callback encode!(binary()) :: binary() + @callback decode!(binary()) :: binary() + @callback name() :: atom() + + def mandatory_codecs do + [null: __MODULE__.Null, deflate: __MODULE__.Deflate] + end + + def get_codec_implementation(codec, user_codecs \\ []) + def get_codec_implementation(codec, user_codecs) when is_binary(codec), + do: get_codec_implementation(String.to_atom(codec), user_codecs) + + def get_codec_implementation(codec, user_codecs) when is_atom(codec) do + impl = + mandatory_codecs() + |> Keyword.merge(user_codecs) + |> Keyword.get(codec) + + if impl do + {:ok, impl} + else + {:error, %AvroEx.DecodeError{message: "Codec implimentation not found"}} + end + end +end diff --git a/lib/avro_ex/object_container/codec/deflate.ex b/lib/avro_ex/object_container/codec/deflate.ex new file mode 100644 index 0000000..e890c80 --- /dev/null +++ b/lib/avro_ex/object_container/codec/deflate.ex @@ -0,0 +1,9 @@ +defmodule AvroEx.ObjectContainer.Codec.Deflate do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def name(), do: :deflate + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: :zlib.zip(data) + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: :zlib.unzip(data) +end diff --git a/lib/avro_ex/object_container/codec/null.ex b/lib/avro_ex/object_container/codec/null.ex new file mode 100644 index 0000000..144e412 --- /dev/null +++ b/lib/avro_ex/object_container/codec/null.ex @@ -0,0 +1,9 @@ +defmodule AvroEx.ObjectContainer.Codec.Null do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def name(), do: :null + @impl AvroEx.ObjectContainer.Codec + def encode!(data), do: data + @impl AvroEx.ObjectContainer.Codec + def decode!(data), do: data +end diff --git a/lib/avro_ex/object_container/codec/snappy.ex b/lib/avro_ex/object_container/codec/snappy.ex new file mode 100644 index 0000000..409d180 --- /dev/null +++ b/lib/avro_ex/object_container/codec/snappy.ex @@ -0,0 +1,42 @@ +defmodule AvroEx.ObjectContainer.Codec.Snappy do + @behaviour AvroEx.ObjectContainer.Codec + @impl AvroEx.ObjectContainer.Codec + def name(), do: :snappy + + if Code.ensure_loaded?(:snappyer) do + @impl AvroEx.ObjectContainer.Codec + def encode!(data) do + {:ok, compressed} = :snappyer.compress(data) + <> + end + + @impl AvroEx.ObjectContainer.Codec + def decode!(data) do + len = byte_size(data) - 4 + <> = data + {:ok, decompressed} = :snappyer.decompress(compressed) + + if crc == :erlang.crc32(decompressed) do + decompressed + else + raise %AvroEx.DecodeError{message: "CRC mismatch during decompression"} + end + end + else + @impl AvroEx.ObjectContainer.Codec + def encode!(_data) do + raise """ + Cannot encode data using the Snappy codec because snappyer has not been loaded. + If you require Snappy compression, you must add snappyer as a dependency in your mix.exs file. + """ + end + + @impl AvroEx.ObjectContainer.Codec + def decode!(_data) do + raise """ + Cannot encode data using the Snappy codec because snappyer has not been loaded. + If you require Snappy compression, you must add snappyer as a dependency in your mix.exs file. + """ + end + end +end diff --git a/test/object_container_encode_test.exs b/test/object_container_encode_test.exs new file mode 100644 index 0000000..3ad4489 --- /dev/null +++ b/test/object_container_encode_test.exs @@ -0,0 +1,211 @@ +defmodule AvroEx.ObjectContainer.Encode.Test do + use ExUnit.Case, async: true + + alias AvroEx.ObjectContainer + alias AvroEx.ObjectContainer.Codec + + describe "encode file header" do + test "new containers have different sync bytes" do + containers = + for _ <- 1..10 do + ObjectContainer.new(nil) + end + + for container <- containers do + others = containers |> List.delete(container) + + for other <- others do + refute container.sync == other.sync + end + end + end + + # TODO: use multiple schemas instead of just "null" + test "codec embedded in header" do + codecs = [Codec.Null, Codec.Deflate, Codec.Snappy] + + containers = + for codec <- codecs do + ObjectContainer.new(AvroEx.decode_schema!(~S("null")), codec: codec) + end + + headers = + for container <- containers do + headerdata = ObjectContainer.encode_file_header!(container) + AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + end + + for {header, codec} <- Enum.zip(headers, codecs) do + assert header["meta"]["avro.codec"] == to_string(codec.name()) + end + end + + test "default codec is null" do + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + assert header["meta"]["avro.codec"] == "null" + end + + test "schema is stored in the file header metadata" do + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" + end + + test "user metadata is stored in the file header metadata" do + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + assert header["meta"]["first_time"] == "12345678" + end + + test "user metadata does not prevent schema and codec from being written preoperly" do + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null")), meta: %{first_time: "12345678"}) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + assert header["meta"]["avro.codec"] == "null" + assert header["meta"]["avro.schema"] == "{\"type\":\"null\"}" + end + + test "magic matches standard" do + container = ObjectContainer.new(AvroEx.decode_schema!(~S("null"))) + headerdata = ObjectContainer.encode_file_header!(container) + header = AvroEx.decode!(ObjectContainer.file_header_schema(), headerdata) + assert header["magic"] == <<"Obj", 1>> + end + end + + describe "block header" do + test "encode and then decode block header" do + # TODO: property based test makes more sense + encoded_header = ObjectContainer.encode_block_header!(100, 5000) + header = AvroEx.decode!(ObjectContainer.block_header_schema(), encoded_header) + assert header["num_objects"] == 100 + assert header["num_bytes"] == 5000 + end + end + + describe "decode file header" do + test "full valid file header with optional metas" do + {:ok, header, <<>>} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{ + "avro.schema" => "{\"type\":\"null\"}", + "avro.codec" => "null", + "custom_meta" => "custom_value" + }, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + + assert header.schema == AvroEx.decode_schema!(nil) + assert header.codec == ObjectContainer.Codec.Null + assert header.meta == %{"custom_meta" => "custom_value"} + assert header.sync == <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + end + + test "invalid magic is detected" do + assert {:error, %AvroEx.DecodeError{}} = + ObjectContainer.decode_file_header("some random data stream that doesn't start with right magic") + end + + test "missing schema detected" do + assert {:error, %AvroEx.DecodeError{}} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{"avro.codec" => "null"}, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + end + + test "missing codec defaults to null" do + assert {:ok, header, <<>>} = + ObjectContainer.decode_file_header( + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{"avro.schema" => "{\"type\":\"null\"}"}, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + ) + + assert header.codec == ObjectContainer.Codec.Null + end + + test "missing sync detected" do + data = + AvroEx.encode!(ObjectContainer.file_header_schema(), %{ + "magic" => <<"Obj", 1>>, + "meta" => %{ + "avro.schema" => "{\"type\":\"null\"}", + "avro.codec" => "null", + "custom_meta" => "custom_value" + }, + "sync" => <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> + }) + + slice = byte_size(data) - Enum.random(1..16) + <> = data + assert {:error, _} = ObjectContainer.decode_file_header(corrupt_data) + end + end + + describe "encode and decode block objects" do + setup testinfo do + data_scema = + AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_test_data", + "fields" => [%{"name" => "testdata1", "type" => "bytes"}, %{"name" => "testdata2", "type" => "int"}] + }) + ocf = ObjectContainer.new(data_scema) + testinfo |> Map.merge(%{data_scema: data_scema, ocf: ocf}) + end + + test "encode and then decode objects", %{ocf: ocf} do + data_input = for v <- 1..10, do: %{"testdata1" => "test#{v}", "testdata2" => v} + encoded = ObjectContainer.encode_block_objects!(ocf, data_input) + block_header = %{num_objects: 10, num_bytes: byte_size(encoded)} + {:ok, data_output, _rest} = ObjectContainer.decode_block_objects(ocf, block_header, encoded) + assert data_output == data_input + end + end + + describe "full object container" do + setup testinfo do + data_scema = + AvroEx.decode_schema!(%{ + "type" => "record", + "name" => "block_test_data", + "fields" => [%{"name" => "testdata1", "type" => "bytes"}, %{"name" => "testdata2", "type" => "int"}] + }) + ocf = ObjectContainer.new(data_scema) + testinfo |> Map.merge(%{data_scema: data_scema, ocf: ocf}) + end + + test "encode and then decode a file with a single block", %{data_scema: data_scema, ocf: ocf} do + data_input = for v <- 1..10, do: %{"testdata1" => "test#{v}", "testdata2" => v} + file_data = ObjectContainer.encode_file!(ocf, data_input) + {:ok, ocf_output, data_output} = ObjectContainer.decode_file(file_data) + assert AvroEx.encode_schema(ocf_output.schema) == AvroEx.encode_schema(data_scema) + assert data_output == data_input + end + + test "encode and then decode a file with a multiple blocks", %{data_scema: data_scema, ocf: ocf} do + data_input = for v <- 1..30, do: %{"testdata1" => "test#{v}", "testdata2" => v} + data_chunks = Enum.chunk_every(data_input, 10) + file_data = ObjectContainer.encode_file!(ocf, hd(data_chunks)) + file_data = for block <- tl(data_chunks), reduce: file_data do + acc -> acc <> ObjectContainer.encode_block!(ocf, block) + end + {:ok, ocf_output, data_output} = ObjectContainer.decode_file(file_data) + assert AvroEx.encode_schema(ocf_output.schema) == AvroEx.encode_schema(data_scema) + assert data_output == data_input + end + end +end